diff --git a/doc/conf.py b/doc/conf.py index 30a55b9..a973ebc 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # pyLHC documentation build configuration file, created by # sphinx-quickstart on Tue Feb 6 12:10:18 2018. @@ -97,7 +96,7 @@ def about_package(init_posixpath: pathlib.Path) -> dict: # Override link in 'Edit on Github' rst_prolog = f""" -:github_url: {ABOUT_PYLHC['__url__']} +:github_url: {ABOUT_PYLHC["__url__"]} """ # The version info for the project you're documenting, acts as replacement for diff --git a/pylhc/__init__.py b/pylhc/__init__.py index 8c76dc1..bc95d96 100644 --- a/pylhc/__init__.py +++ b/pylhc/__init__.py @@ -7,6 +7,7 @@ :copyright: pyLHC/OMC-Team working group. :license: MIT, see the LICENSE.md file for details. """ + __title__ = "pylhc" __description__ = "An accelerator physics script collection for the OMC team at CERN." __url__ = "https://github.com/pylhc/pylhc" diff --git a/pylhc/bpm_calibration.py b/pylhc/bpm_calibration.py index 8e98fb4..611ed91 100644 --- a/pylhc/bpm_calibration.py +++ b/pylhc/bpm_calibration.py @@ -51,6 +51,7 @@ default: ``beta`` """ + from pathlib import Path import tfs @@ -74,33 +75,29 @@ def _get_params() -> dict: """ return EntryPointParameters( - inputdir=dict( - type=Path, - required=True, - help="Measurements path." - ), - outputdir=dict( - type=Path, - required=True, - help="Output directory where to write the calibration factors.", - ), - ips=dict( - type=int, - nargs="+", - choices=IPS, - required=False, - help="IPs to compute calibration factors for.", - ), - method=dict( - type=str, - required=False, - choices=METHODS, - default=METHODS[0], - help=( + inputdir={"type": Path, "required": True, "help": "Measurements path."}, + outputdir={ + "type": Path, + "required": True, + "help": "Output directory where to write the calibration factors.", + }, + ips={ + "type": int, + "nargs": "+", + "choices": IPS, + "required": False, + "help": "IPs to compute calibration factors for.", + }, + method={ + "type": str, + "required": False, + "choices": METHODS, + "default": METHODS[0], + "help": ( "Method to be used to compute the calibration factors. " "The Beta function is used by default." ), - ), + }, ) @@ -113,16 +110,18 @@ def main(opt): factors = get_calibration_factors_from_dispersion(opt.ips, opt.inputdir) # Fill NaN with 1 because of missing BPMs and that fit cannot be done everywhere - for plane in factors.keys(): - factors[plane] = factors[plane].fillna(1) - LOG.debug("".join([f"\nPlane {plane}:\n{factors[plane]}" for plane in factors.keys()])) + for plane in factors: + factors[plane] = factors[plane].infer_objects().fillna(1) + LOG.debug("".join([f"\nPlane {plane}:\n{factors[plane]}" for plane in factors])) # Write the TFS file to the desired output directory opt.outputdir.mkdir(parents=True, exist_ok=True) - for plane in factors.keys(): - tfs.write(opt.outputdir / f"{CALIBRATION_NAME[opt.method]}{plane.lower()}{EXT}", - factors[plane].reset_index(), - save_index=False) + for plane in factors: + tfs.write( + opt.outputdir / f"{CALIBRATION_NAME[opt.method]}{plane.lower()}{EXT}", + factors[plane].reset_index(), + save_index=False, + ) return factors diff --git a/pylhc/bsrt_analysis.py b/pylhc/bsrt_analysis.py index f2e73eb..9fc88ba 100644 --- a/pylhc/bsrt_analysis.py +++ b/pylhc/bsrt_analysis.py @@ -11,6 +11,7 @@ - If provided a `TfsDataFrame` file with timestamps, plots of the 2D distribution and comparison of fit parameters to cross sections are added. """ + import datetime import glob import gzip @@ -22,10 +23,10 @@ import pandas as pd import parse import pytz - import tfs from generic_parser import EntryPointParameters, entrypoint from omc3.utils import logging_tools, time_tools + from pylhc.constants.general import TFS_SUFFIX, TIME_COLUMN from pylhc.forced_da_analysis import get_approximate_index @@ -38,47 +39,52 @@ def get_params(): return EntryPointParameters( - directory=dict( - flags=["-d", "--directory"], - required=True, - type=str, - help="Directory containing the logged BSRT files.", - ), - beam=dict( - flags=["-b", "--beam"], - required=True, - choices=["B1", "B2"], - type=str, - help="Beam for which analysis is performed.", - ), - outputdir=dict( - flags=["-o", "--outputdir"], - type=str, - default=None, - help=( + directory={ + "flags": ["-d", "--directory"], + "required": True, + "type": str, + "help": "Directory containing the logged BSRT files.", + }, + beam={ + "flags": ["-b", "--beam"], + "required": True, + "choices": ["B1", "B2"], + "type": str, + "help": "Beam for which analysis is performed.", + }, + outputdir={ + "flags": ["-o", "--outputdir"], + "type": str, + "default": None, + "help": ( "Directory in which plots and dataframe will be saved in. If omitted, " "no data will be saved." ), - ), - starttime=dict( - flags=["--starttime"], - type=int, - help="Start of time window for analysis in milliseconds UTC.", - ), - endtime=dict( - flags=["--endtime"], - type=int, - help="End of time window for analysis in milliseconds UTC.", - ), - kick_df=dict( - flags=["--kick_df"], - default=None, - help=( + }, + starttime={ + "flags": ["--starttime"], + "type": int, + "help": "Start of time window for analysis in milliseconds UTC.", + }, + endtime={ + "flags": ["--endtime"], + "type": int, + "help": "End of time window for analysis in milliseconds UTC.", + }, + kick_df={ + "flags": ["--kick_df"], + "default": None, + "help": ( f"TFS with column {TIME_COLUMN} with time stamps to be added in the plots. " f"Additionally, cross section at these timestamps will be plotted.", ), - ), - show_plots=dict(flags=["--show_plots"], type=bool, default=False, help="Show BSRT plots."), + }, + show_plots={ + "flags": ["--show_plots"], + "type": bool, + "default": False, + "help": "Show BSRT plots.", + }, ) @@ -159,7 +165,7 @@ def _select_files(opt, files_df): def _load_files_in_df(opt): files_df = pd.DataFrame( - data={"FILES": glob.glob(str(Path(opt.directory) / _get_bsrt_logger_fname(opt.beam, "*")))} + data={"FILES": glob.glob(str(Path(opt.directory) / _get_bsrt_logger_fname(opt.beam, "*")))} # noqa: PTH207 ) files_df = files_df.assign( @@ -175,8 +181,7 @@ def _load_files_in_df(opt): ) files_df = files_df.assign(TIME=[f.timestamp() for f in files_df["TIMESTAMP"]]) - files_df = files_df.sort_values(by=["TIME"]).reset_index(drop=True).set_index("TIME") - return files_df + return files_df.sort_values(by=["TIME"]).reset_index(drop=True).set_index("TIME") def _get_timestamp_from_name(name, formatstring): @@ -189,7 +194,7 @@ def _get_timestamp_from_name(name, formatstring): def _check_and_fix_entries(entry): # pd.to_csv does not handle np.array as entries nicely, converting to list circumvents this for key, val in entry.items(): - if isinstance(val, (np.ndarray, tuple)): + if isinstance(val, (np.ndarray | tuple)): entry[key] = list(val) if np.array(val).size == 0: entry[key] = np.nan @@ -199,7 +204,8 @@ def _check_and_fix_entries(entry): def _load_pickled_data(opt, files_df): merged_df = pd.DataFrame() for bsrtfile in files_df["FILES"]: - data = pickle.load(gzip.open(bsrtfile, "rb")) + with gzip.open(bsrtfile, "rb") as f: + data = pickle.load(f) new_df = pd.DataFrame.from_records([_check_and_fix_entries(entry) for entry in data]) merged_df = pd.concat([merged_df, new_df], axis="index", ignore_index=True) @@ -224,7 +230,6 @@ def _add_kick_lines(ax, df): def _fit_var(ax, bsrt_df, plot_dict, opt): - ax[plot_dict["idx"]].plot( bsrt_df.index, [entry[plot_dict["fitidx"]] for entry in bsrt_df["lastFitResults"]] ) @@ -234,7 +239,6 @@ def _fit_var(ax, bsrt_df, plot_dict, opt): def plot_fit_variables(opt, bsrt_df): - fig, ax = plt.subplots(nrows=2, ncols=3, figsize=(20, 9), sharex=True, constrained_layout=True) plot_dicts = [ @@ -292,8 +296,8 @@ def _full_crossection(ax, bsrt_df, plot_dict, opt): ax, bsrt_df.reset_index(), "TimeIndex", - f'projPositionSet{plot_dict["idx"]}', - f'projDataSet{plot_dict["idx"]}', + f"projPositionSet{plot_dict['idx']}", + f"projDataSet{plot_dict['idx']}", ) ax.plot( bsrt_df.index, @@ -326,7 +330,6 @@ def _full_crossection(ax, bsrt_df, plot_dict, opt): def plot_full_crosssection(opt, bsrt_df): - plot_dicts = [ {"idx": 1, "fitresult": 3, "fiterror": 4, "title": "Horizontal Cross section"}, {"idx": 2, "fitresult": 8, "fiterror": 9, "title": "Vertical Cross section"}, @@ -344,7 +347,7 @@ def plot_full_crosssection(opt, bsrt_df): def _gauss(x, *p): a, b, c = p - return a * np.exp(-((x - b) ** 2) / (2.0 * c ** 2.0)) + return a * np.exp(-((x - b) ** 2) / (2.0 * c**2.0)) def _reshaped_imageset(df): @@ -408,7 +411,6 @@ def plot_crosssection_for_timesteps(opt, bsrt_df): def _aux_variables(ax, bsrt_df, plot_dict, opt): - ax.plot( bsrt_df.index, bsrt_df[plot_dict["variable1"]], color="red", label=plot_dict["variable1"] ) diff --git a/pylhc/bsrt_logger.py b/pylhc/bsrt_logger.py index 32ab95f..02eb04b 100644 --- a/pylhc/bsrt_logger.py +++ b/pylhc/bsrt_logger.py @@ -9,11 +9,13 @@ Original authors: E. H. Maclean, T. Persson and G. Trad. """ + import datetime as dt import os import pickle import sys import time +from pathlib import Path from omc3.definitions import formats from omc3.utils.mock import cern_network_import @@ -34,8 +36,7 @@ def parse_timestamp(thistime): ] for fmat in accepted_time_input_format: try: - dtobject = dt.datetime.strptime(thistime, fmat) - return dtobject + return dt.datetime.strptime(thistime, fmat) except ValueError: pass timefmatstring = "" @@ -53,22 +54,21 @@ def parse_timestamp(thistime): # function to help write output from datetime objects in standard format throughout code def convert_to_data_output_format(dtobject): - output_timestamp = dtobject.strftime(formats.TIME) - return output_timestamp + return dtobject.strftime(formats.TIME) ########################################## -if __name__ == '__main__': +if __name__ == "__main__": # Create a PyJapc instance with selector SCT.USER.ALL # INCA is automatically configured based on the timing domain you specify here CycleName = "LHC.USER.ALL" INCAacc = "LHC" - noSetFlag = True + no_set_flag = True - japc = pyjapc.PyJapc(selector=CycleName, incaAcceleratorName=INCAacc, noSet=noSetFlag) + japc = pyjapc.PyJapc(selector=CycleName, incaAcceleratorName=INCAacc, noSet=no_set_flag) japc.rbacLogin() acquesitions_per_file = 100 j = 0 @@ -79,8 +79,8 @@ def convert_to_data_output_format(dtobject): B1_image = japc.getParam("LHC.BSRTS.5R4.B1/Image") B2_image = japc.getParam("LHC.BSRTS.5L4.B2/Image") if t == 0: - allB1data = [] - allB2data = [] + all_b1_data = [] + all_b2_data = [] B1_IMGtime = B1_image["acqTime"] B2_IMGtime = B2_image["acqTime"] B1_IMGtime_dt = parse_timestamp(B1_IMGtime) @@ -88,19 +88,16 @@ def convert_to_data_output_format(dtobject): B1_IMGtime_st = convert_to_data_output_format(B1_IMGtime_dt) B2_IMGtime_st = convert_to_data_output_format(B2_IMGtime_dt) - allB1data.append(B1_image) - allB2data.append(B2_image) + all_b1_data.append(B1_image) + all_b2_data.append(B2_image) t += 1 if t == acquesitions_per_file: j += 1 f1name = "data_BSRT_B1_" + B1_IMGtime_st + ".dat" f2name = "data_BSRT_B2_" + B2_IMGtime_st + ".dat" - f1 = open(f1name, "wb") - f2 = open(f2name, "wb") - pickle.dump(allB1data, f1) - pickle.dump(allB2data, f2) - f1.close() - f2.close() + with Path(f1name).open("wb") as f1, Path(f2name).open("wb") as f2: + pickle.dump(all_b1_data, f1) + pickle.dump(all_b2_data, f2) os.system("gzip " + f1name) os.system("gzip " + f2name) t = 0 diff --git a/pylhc/calibration/beta.py b/pylhc/calibration/beta.py index 427760d..58653c3 100644 --- a/pylhc/calibration/beta.py +++ b/pylhc/calibration/beta.py @@ -7,8 +7,9 @@ to be used with the script `bpm_calibration.py`. """ + +from collections.abc import Sequence from pathlib import Path -from typing import Dict, Sequence, Tuple import numpy as np import pandas as pd @@ -50,9 +51,9 @@ def err_function(x, popt, pcov): sa, sb, sab = pcov[0, 0], pcov[1, 1], pcov[0, 1] a, b = popt[0], popt[1] - beta_err = ((a ** 2 - (x - b) ** 2) / a ** 2) ** 2 * sa + beta_err = ((a**2 - (x - b) ** 2) / a**2) ** 2 * sa beta_err += 4 * ((x - b) / a) ** 2 * sb - beta_err -= 4 * (x - b) * (a ** 2 - (x - b) ** 2) / a ** 3 * sab + beta_err -= 4 * (x - b) * (a**2 - (x - b) ** 2) / a**3 * sab return beta_err positions = beta_phase_tfs.reindex(bpms)[f"{S}"] @@ -60,7 +61,7 @@ def err_function(x, popt, pcov): beta_phase_err = beta_phase_tfs.reindex(bpms)[f"{ERR}{BETA}{plane}"] # Get the rough IP position and beta star for the initial values - ip_position = (positions[-1] - positions[0]) / 2 + ip_position = (positions.iloc[-1] - positions.iloc[0]) / 2 initial_values = (BETA_STAR_ESTIMATION, ip_position) # Get the curve fit for the expected parabola @@ -81,7 +82,7 @@ def err_function(x, popt, pcov): ) # Get the error from the covariance matrix - perr = np.sqrt(np.diag(pcov)) + _perr = np.sqrt(np.diag(pcov)) # not used? # Get the fitted beta and add the errors to get min/max values beta_fit = beta_function(positions[valid], *popt) @@ -118,8 +119,8 @@ def _get_factors_from_phase( factors = np.sqrt(beta_phase / beta_amp) # Now compute the errors - calibration_error = (beta_phase_err ** 2) / (4 * beta_amp * beta_phase) - calibration_error += (beta_phase * (beta_amp_err ** 2)) / (4 * (beta_amp ** 3)) + calibration_error = (beta_phase_err**2) / (4 * beta_amp * beta_phase) + calibration_error += (beta_phase * (beta_amp_err**2)) / (4 * (beta_amp**3)) calibration_error = np.sqrt(calibration_error) return pd.DataFrame({LABELS[1]: factors, LABELS[2]: calibration_error}) @@ -130,7 +131,7 @@ def _get_factors_from_phase_fit( beta_amp_tfs: pd.DataFrame, ips: Sequence[int], plane: str, -) -> Tuple[pd.Series, pd.Series]: +) -> tuple[pd.Series, pd.Series]: """ This function computes the calibration factors for the beta method with the beta from phase fit values. The associated error is also calculated. @@ -150,7 +151,7 @@ def _get_factors_from_phase_fit( # Get the beam concerned by those tfs files beam = int(beta_phase_tfs.iloc[0].name[-1]) - calibration_phase_fit, calibration_phase_fit_err = None, None + calibration_phase_fit, _calibration_phase_fit_err = None, None for ip in ips: LOG.info(f" Computing the calibration factors from phase fit for IP {ip}") @@ -183,7 +184,9 @@ def _get_factors_from_phase_fit( return calibration_phase_fit -def get_calibration_factors_from_beta(ips: Sequence[int], input_path: Path) -> Dict[str, pd.DataFrame]: +def get_calibration_factors_from_beta( + ips: Sequence[int], input_path: Path +) -> dict[str, pd.DataFrame]: """ This function is the main function to compute the calibration factors for the beta method. @@ -214,20 +217,24 @@ def get_calibration_factors_from_beta(ips: Sequence[int], input_path: Path) -> D """ LOG.info("Computing the calibration factors via beta") # Loop over each plane and compute the calibration factors - calibration_factors = dict() + calibration_factors = {} for plane in PLANES: LOG.info(f" Computing the calibration factors for plane {plane}") # Load the tfs files for beta from phase and beta from amp beta_phase_tfs = tfs.read(input_path / f"{BETA_NAME}{plane.lower()}{EXT}", index=TFS_INDEX) - beta_amp_tfs = tfs.read(input_path / f"{AMP_BETA_NAME}{plane.lower()}{EXT}", index=TFS_INDEX) + beta_amp_tfs = tfs.read( + input_path / f"{AMP_BETA_NAME}{plane.lower()}{EXT}", index=TFS_INDEX + ) # Get the calibration factors from phase calibration_phase = _get_factors_from_phase(beta_phase_tfs, beta_amp_tfs, plane) # Calibration from phase fit can only be obtained via ballistic optics if ips is not None: - calibration_phase_fit = _get_factors_from_phase_fit(beta_phase_tfs, beta_amp_tfs, ips, plane) + calibration_phase_fit = _get_factors_from_phase_fit( + beta_phase_tfs, beta_amp_tfs, ips, plane + ) else: calibration_phase_fit = pd.DataFrame(columns=(LABELS[3], LABELS[4])) diff --git a/pylhc/calibration/dispersion.py b/pylhc/calibration/dispersion.py index b641dd8..28350aa 100644 --- a/pylhc/calibration/dispersion.py +++ b/pylhc/calibration/dispersion.py @@ -7,37 +7,24 @@ intended to be used with the script `bpm_calibration.py`. """ + from pathlib import Path -from scipy.optimize import curve_fit + import numpy as np import pandas as pd -from typing import Tuple, List, Dict - -from omc3.utils import logging_tools -from omc3.optics_measurements.constants import ( - ERR, - EXT, - NORM_DISP_NAME, - DISPERSION_NAME, - S, -) - -from pylhc.constants.calibration import ( - BPMS, - D_BPMS, - D, - LABELS, - TFS_INDEX, -) import tfs +from omc3.optics_measurements.constants import DISPERSION_NAME, ERR, EXT, NORM_DISP_NAME, S +from omc3.utils import logging_tools +from scipy.optimize import curve_fit +from pylhc.constants.calibration import BPMS, D_BPMS, LABELS, TFS_INDEX, D LOG = logging_tools.get_logger(__name__) def _get_dispersion_fit( positions: pd.Series, dispersion_values: pd.Series, dispersion_err: pd.Series -) -> Tuple[pd.Series, pd.Series]: +) -> tuple[pd.Series, pd.Series]: """ This function returns a fit of the given dispersion values along with the associated error. @@ -68,20 +55,18 @@ def dispersion_function(x, a, b): # Get the fitted beta and add the errors to get min/max values dispersion_fit = dispersion_function(positions, fit[0], fit[1]) - dispersion_max_fit = dispersion_function( - positions, fit[0] + fit_err[0], fit[1] + fit_err[1] - ) - dispersion_min_fit = dispersion_function( - positions, fit[0] - fit_err[0], fit[1] - fit_err[1] - ) + dispersion_max_fit = dispersion_function(positions, fit[0] + fit_err[0], fit[1] + fit_err[1]) + dispersion_min_fit = dispersion_function(positions, fit[0] - fit_err[0], fit[1] - fit_err[1]) dispersion_fit_err = (dispersion_max_fit - dispersion_min_fit) / 2 return dispersion_fit, dispersion_fit_err def _get_factors_from_dispersion( - dispersion: Dict[str, pd.Series], phase: str, phase_err: str, -) -> Tuple[pd.Series, pd.Series]: + dispersion: dict[str, pd.Series], + phase: str, + phase_err: str, +) -> tuple[pd.Series, pd.Series]: """ This function computes the calibration factors for the dispersion method with the non fitted dispersion values. The associated error is also @@ -115,8 +100,8 @@ def _get_factors_from_dispersion( def get_calibration_factors_from_dispersion( - ips: List[int], input_path: Path -) -> Dict[str, pd.DataFrame]: + ips: list[int], input_path: Path +) -> dict[str, pd.DataFrame]: """ This function is the main function to compute the calibration factors for the dispersion method. @@ -148,29 +133,29 @@ def get_calibration_factors_from_dispersion( """ LOG.info("Computing the calibration factors via dispersion") # Load the normalized dispersion tfs file - norm_dispersion_tfs = tfs.read( - input_path / f"{NORM_DISP_NAME}x{EXT}", index=TFS_INDEX - ) + norm_dispersion_tfs = tfs.read(input_path / f"{NORM_DISP_NAME}x{EXT}", index=TFS_INDEX) dispersion_tfs = tfs.read(input_path / f"{DISPERSION_NAME}x{EXT}", index=TFS_INDEX) # Get the beam concerned by those tfs files beam = int(dispersion_tfs.iloc[0].name[-1]) # Loop over the IPs and compute the calibration factors - calibration_factors = dict() + calibration_factors = {} for ip in ips: LOG.info(f" Computing the calibration factors for IP {ip}, plane X") # Filter our TFS files to only keep the BPMs for the selected IR bpms = dispersion_tfs.reindex(BPMS[ip][beam]) d_bpms = dispersion_tfs.reindex(D_BPMS[ip][beam]) - + # Check for possible missing bpms for bpm_set in [bpms, d_bpms]: missing = set(bpm_set.loc[bpm_set.isnull().values].index) if missing: - LOG.warning(" One or several BPMs are missing in the input" - " DataFrame, the calibration factors calculation" - f"from fit may not be accurate: {missing}") + LOG.warning( + " One or several BPMs are missing in the input" + " DataFrame, the calibration factors calculation" + f"from fit may not be accurate: {missing}" + ) # Get the positions of the BPMs and the subset used for the fit bpms = bpms.index @@ -179,8 +164,8 @@ def get_calibration_factors_from_dispersion( positions_fit = dispersion_tfs.reindex(d_bpms)[S] # Get the dispersion and dispersion from phase from the tfs files - dispersion = dict() - normalised_dispersion = dict() + dispersion = {} + # normalised_dispersion = dict() dispersion["amp"] = dispersion_tfs.reindex(bpms)["DX"] dispersion["amp_err"] = dispersion_tfs.reindex(bpms)[f"{ERR}{D}X"] @@ -190,7 +175,8 @@ def get_calibration_factors_from_dispersion( # Compute the calibration factors using the dispersion from phase and amp calibration, calibration_err = _get_factors_from_dispersion( - dispersion, "phase", "phase_err") + dispersion, "phase", "phase_err" + ) # Fit the dispersion from phase dispersion["phase_fit"], dispersion["phase_fit_err"] = _get_dispersion_fit( @@ -216,9 +202,11 @@ def get_calibration_factors_from_dispersion( factors_for_ip.columns = LABELS factors_for_ip.index.name = TFS_INDEX - if "X" not in calibration_factors.keys(): + if "X" not in calibration_factors: calibration_factors = {"X": factors_for_ip} else: - calibration_factors["X"] = pd.concat([calibration_factors["X"], factors_for_ip], axis="index") + calibration_factors["X"] = pd.concat( + [calibration_factors["X"], factors_for_ip], axis="index" + ) return calibration_factors diff --git a/pylhc/constants/calibration.py b/pylhc/constants/calibration.py index ca075a4..996eb58 100644 --- a/pylhc/constants/calibration.py +++ b/pylhc/constants/calibration.py @@ -9,125 +9,141 @@ IPS = [1, 4, 5] # Constants for TFS files -LABELS = ['S', - 'CALIBRATION', - 'ERROR_CALIBRATION', - 'CALIBRATION_FIT', - 'ERROR_CALIBRATION_FIT'] -TFS_INDEX = 'NAME' -D = 'D' -ND = 'ND' +LABELS = ["S", "CALIBRATION", "ERROR_CALIBRATION", "CALIBRATION_FIT", "ERROR_CALIBRATION_FIT"] +TFS_INDEX = "NAME" +D = "D" +ND = "ND" # Estimation for the curve fit BETA_STAR_ESTIMATION = 200 # Methods to be used to compulte the calibration factors -METHODS = ('beta', 'dispersion') +METHODS = ("beta", "dispersion") # File name prefix for calibration output # end result example: {'beta': 'calibration_beta_.tfs', 'dispersion' ... } -CALIBRATION_NAME = {m: f'calibration_{m}_' for m in METHODS} +CALIBRATION_NAME = {m: f"calibration_{m}_" for m in METHODS} # Define BPMs to be used for a combination of IP and Beam -BPMS = {1: {1: ['BPMR.5L1.B1', - 'BPMYA.4L1.B1', - 'BPMWB.4L1.B1', - 'BPMSY.4L1.B1', - 'BPMS.2L1.B1', - 'BPMSW.1L1.B1', - 'BPMSW.1R1.B1', - 'BPMS.2R1.B1', - 'BPMSY.4R1.B1', - 'BPMWB.4R1.B1', - 'BPMYA.4R1.B1'], - 2: ['BPM.5L1.B2', - 'BPMYA.4L1.B2', - 'BPMWB.4L1.B2', - 'BPMSY.4L1.B2', - 'BPMS.2L1.B2', - 'BPMSW.1L1.B2', - 'BPMSW.1R1.B2', - 'BPMS.2R1.B2', - 'BPMSY.4R1.B2', - 'BPMWB.4R1.B2', - 'BPMYA.4R1.B2'] - }, - 4: {1: [ - 'BPMYA.5L4.B1', - 'BPMWI.A5L4.B1', - 'BPMWA.B5L4.B1', - 'BPMWA.A5L4.B1', - 'BPMWA.A5R4.B1', - 'BPMWA.B5R4.B1', - 'BPMYB.5R4.B1', - 'BPMYA.6R4.B1', - ], - 2: [ - 'BPMYB.5L4.B2', - 'BPMWA.B5L4.B2', - 'BPMWA.A5L4.B2', - 'BPMWA.A5R4.B2', - 'BPMWA.B5R4.B2', - 'BPMWI.A5R4.B2', - 'BPMYA.5R4.B2', - 'BPMYB.6R4.B2' - ] - }, - 5: {1: ['BPMYA.4L5.B1', - 'BPMWB.4L5.B1', - 'BPMSY.4L5.B1', - 'BPMS.2L5.B1', - 'BPMSW.1L5.B1', - 'BPMSW.1R5.B1', - 'BPMS.2R5.B1', - 'BPMSY.4R5.B1', - 'BPMWB.4R5.B1', - 'BPMYA.4R5.B1', - 'BPM.5R5.B1'], - 2: ['BPMYA.4L5.B2', - 'BPMWB.4L5.B2', - 'BPMSY.4L5.B2', - 'BPMS.2L5.B2', - 'BPMSW.1L5.B2', - 'BPMSW.1R5.B2', - 'BPMS.2R5.B2', - 'BPMSY.4R5.B2', - 'BPMWB.4R5.B2', - 'BPMYA.4R5.B2', - 'BPMR.5R5.B2'] - } - } +BPMS = { + 1: { + 1: [ + "BPMR.5L1.B1", + "BPMYA.4L1.B1", + "BPMWB.4L1.B1", + "BPMSY.4L1.B1", + "BPMS.2L1.B1", + "BPMSW.1L1.B1", + "BPMSW.1R1.B1", + "BPMS.2R1.B1", + "BPMSY.4R1.B1", + "BPMWB.4R1.B1", + "BPMYA.4R1.B1", + ], + 2: [ + "BPM.5L1.B2", + "BPMYA.4L1.B2", + "BPMWB.4L1.B2", + "BPMSY.4L1.B2", + "BPMS.2L1.B2", + "BPMSW.1L1.B2", + "BPMSW.1R1.B2", + "BPMS.2R1.B2", + "BPMSY.4R1.B2", + "BPMWB.4R1.B2", + "BPMYA.4R1.B2", + ], + }, + 4: { + 1: [ + "BPMYA.5L4.B1", + "BPMWI.A5L4.B1", + "BPMWA.B5L4.B1", + "BPMWA.A5L4.B1", + "BPMWA.A5R4.B1", + "BPMWA.B5R4.B1", + "BPMYB.5R4.B1", + "BPMYA.6R4.B1", + ], + 2: [ + "BPMYB.5L4.B2", + "BPMWA.B5L4.B2", + "BPMWA.A5L4.B2", + "BPMWA.A5R4.B2", + "BPMWA.B5R4.B2", + "BPMWI.A5R4.B2", + "BPMYA.5R4.B2", + "BPMYB.6R4.B2", + ], + }, + 5: { + 1: [ + "BPMYA.4L5.B1", + "BPMWB.4L5.B1", + "BPMSY.4L5.B1", + "BPMS.2L5.B1", + "BPMSW.1L5.B1", + "BPMSW.1R5.B1", + "BPMS.2R5.B1", + "BPMSY.4R5.B1", + "BPMWB.4R5.B1", + "BPMYA.4R5.B1", + "BPM.5R5.B1", + ], + 2: [ + "BPMYA.4L5.B2", + "BPMWB.4L5.B2", + "BPMSY.4L5.B2", + "BPMS.2L5.B2", + "BPMSW.1L5.B2", + "BPMSW.1R5.B2", + "BPMS.2R5.B2", + "BPMSY.4R5.B2", + "BPMWB.4R5.B2", + "BPMYA.4R5.B2", + "BPMR.5R5.B2", + ], + }, +} # For the dispersion method, only a subject of the BPMs is used # Same as BPM: IP and then beam -D_BPMS = {1: {1: ['BPMSY.4L1.B1', - 'BPMS.2L1.B1', - 'BPMSW.1L1.B1', - 'BPMSW.1R1.B1', - 'BPMS.2R1.B1', - 'BPMSY.4R1.B1'], - 2: ['BPMSY.4L1.B2', - 'BPMS.2L1.B2', - 'BPMSW.1L1.B2', - 'BPMSW.1R1.B2', - 'BPMS.2R1.B2', - 'BPMSY.4R1.B2'] - }, - 4: {1: [], - 2: [] - }, - 5: {1: ['BPMSY.4L5.B1', - 'BPMS.2L5.B1', - 'BPMSW.1L5.B1', - 'BPMSW.1R5.B1', - 'BPMS.2R5.B1', - 'BPMSY.4R5.B1', - ], - 2: ['BPMSY.4L5.B2', - 'BPMS.2L5.B2', - 'BPMSW.1L5.B2', - 'BPMSW.1R5.B2', - 'BPMS.2R5.B2', - 'BPMSY.4R5.B2'] - } - } +D_BPMS = { + 1: { + 1: [ + "BPMSY.4L1.B1", + "BPMS.2L1.B1", + "BPMSW.1L1.B1", + "BPMSW.1R1.B1", + "BPMS.2R1.B1", + "BPMSY.4R1.B1", + ], + 2: [ + "BPMSY.4L1.B2", + "BPMS.2L1.B2", + "BPMSW.1L1.B2", + "BPMSW.1R1.B2", + "BPMS.2R1.B2", + "BPMSY.4R1.B2", + ], + }, + 4: {1: [], 2: []}, + 5: { + 1: [ + "BPMSY.4L5.B1", + "BPMS.2L5.B1", + "BPMSW.1L5.B1", + "BPMSW.1R5.B1", + "BPMS.2R5.B1", + "BPMSY.4R5.B1", + ], + 2: [ + "BPMSY.4L5.B2", + "BPMS.2L5.B2", + "BPMSW.1L5.B2", + "BPMSW.1R5.B2", + "BPMS.2R5.B2", + "BPMSY.4R5.B2", + ], + }, +} diff --git a/pylhc/constants/forced_da_analysis.py b/pylhc/constants/forced_da_analysis.py index 6f909a4..f2dd3f8 100644 --- a/pylhc/constants/forced_da_analysis.py +++ b/pylhc/constants/forced_da_analysis.py @@ -5,6 +5,7 @@ Specific constants relating to the forced DA analysis to be used in ``PyLHC``, to help with consistency. """ + from pylhc.constants.general import PLANE_TO_HV, TFS_SUFFIX RESULTS_DIR = "forced_da_analysis" diff --git a/pylhc/constants/general.py b/pylhc/constants/general.py index 483afd0..181e8dd 100644 --- a/pylhc/constants/general.py +++ b/pylhc/constants/general.py @@ -4,14 +4,24 @@ General constants to be used in ``PyLHC``, to help with consistency. """ + import numpy as np BEAMS = (1, 2) PLANES = ("X", "Y") -PLANE_TO_HV = dict(X="H", Y="V") - -UNIT_TO_M = dict(km=1e3, m=1e0, mm=1e-3, um=1e-6, nm=1e-9, pm=1e-12, fm=1e-15, am=1e-18) +PLANE_TO_HV = {"X": "H", "Y": "V"} + +UNIT_TO_M = { + "km": 1e3, + "m": 1e0, + "mm": 1e-3, + "um": 1e-6, + "nm": 1e-9, + "pm": 1e-12, + "fm": 1e-15, + "am": 1e-18, +} PROTON_MASS = 0.938272 # GeV/c^2 LHC_NOMINAL_EMITTANCE = 3.75 * 1e-6 # Design LHC @@ -26,5 +36,5 @@ def get_proton_gamma(energy): def get_proton_beta(energy): - """ Returns relativistic beta for protons """ + """Returns relativistic beta for protons""" return np.sqrt(1 - (1 / get_proton_gamma(energy) ** 2)) diff --git a/pylhc/constants/kickgroups.py b/pylhc/constants/kickgroups.py index 6047cf2..5688225 100644 --- a/pylhc/constants/kickgroups.py +++ b/pylhc/constants/kickgroups.py @@ -4,6 +4,7 @@ Constants used in the KickGroups """ + from pathlib import Path KICKGROUPS_ROOT = Path("/user/slops/data/LHC_DATA/OP_DATA/Betabeat/KickGroups/MULTITURN_ACQ_GROUPS") @@ -31,6 +32,26 @@ BEAMPROCESS = "BEAMPROCESS" BEAM = "BEAM" -KICK_COLUMNS = [UTCTIME, LOCALTIME, TUNEX, TUNEY, DRIVEN_TUNEX, DRIVEN_TUNEY, DRIVEN_TUNEZ, AMPX, AMPY, AMPZ, TURNS, BUNCH, SDDS, JSON_FILE, BEAM, FILL, OPTICS, OPTICS_URI, BEAMPROCESS] +KICK_COLUMNS = [ + UTCTIME, + LOCALTIME, + TUNEX, + TUNEY, + DRIVEN_TUNEX, + DRIVEN_TUNEY, + DRIVEN_TUNEZ, + AMPX, + AMPY, + AMPZ, + TURNS, + BUNCH, + SDDS, + JSON_FILE, + BEAM, + FILL, + OPTICS, + OPTICS_URI, + BEAMPROCESS, +] COLUMNS_TO_HEADERS = [BEAM, FILL, BUNCH, TURNS, BEAMPROCESS, OPTICS, OPTICS_URI] -KICK_GROUP_COLUMNS = [UTCTIME, LOCALTIME, KICKGROUP, TIMESTAMP] \ No newline at end of file +KICK_GROUP_COLUMNS = [UTCTIME, LOCALTIME, KICKGROUP, TIMESTAMP] diff --git a/pylhc/constants/machine_settings_info.py b/pylhc/constants/machine_settings_info.py index acd6378..f42a0fb 100644 --- a/pylhc/constants/machine_settings_info.py +++ b/pylhc/constants/machine_settings_info.py @@ -5,6 +5,7 @@ Specific constants relating to the retrieval of machine settings information to be used in ``PyLHC``, to help with consistency. """ + from pylhc.constants.general import TFS_SUFFIX # TFS-File Conventions ######################################################### diff --git a/pylhc/data_extract/lsa.py b/pylhc/data_extract/lsa.py index dff88dd..27177b2 100644 --- a/pylhc/data_extract/lsa.py +++ b/pylhc/data_extract/lsa.py @@ -4,23 +4,25 @@ This module provides useful functions to conveniently wrap the functionality of ``pjlsa``. """ -import jpype + import logging import re +from collections.abc import Callable + +import jpype import tfs -from jpype import java, JException +from jpype import JException, java from omc3.utils.mock import cern_network_import from omc3.utils.time_tools import AccDatetime -from typing import Callable, Union, Dict, Tuple, List LOG = logging.getLogger(__name__) pytimber = cern_network_import("pytimber") pjlsa = cern_network_import("pjlsa") try: - pjLSAClient = pjlsa.LSAClient + pjLSAClient = pjlsa.LSAClient # noqa: N816 (it's the real name) except ImportError: - pjLSAClient = object + pjLSAClient = object # noqa: N816 (it's the real name) RELEVANT_BP_CONTEXTS = ("OPERATIONAL", "MD") RELEVANT_BP_CATEGORIES = ("DISCRETE",) @@ -39,7 +41,7 @@ class LSAClient(pjLSAClient): """Extension of the LSAClient.""" def __getattr__(self, item): - """ Overwrite __getattr__ to raise the proper import errors at the proper time.""" + """Overwrite __getattr__ to raise the proper import errors at the proper time.""" try: super().__getattr__(item) except AttributeError as e: @@ -71,7 +73,7 @@ def find_knob_names(self, accelerator: str = "lhc", regexp: str = "") -> list: return sorted(filter(reg.search, [pp.getName() for pp in lst])) return sorted(pp.getName() for pp in lst) - def find_existing_knobs(self, knobs: List[str]) -> List[str]: + def find_existing_knobs(self, knobs: list[str]) -> list[str]: """ Return only the knobs that exist from the given list. This function was created out of the need to filter these first, @@ -93,8 +95,8 @@ def find_existing_knobs(self, knobs: List[str]) -> List[str]: return knobs def find_last_fill( - self, acc_time: AccDatetime, accelerator: str = "lhc", source: str = "nxcals" - ) -> Tuple[str, list]: + self, acc_time: AccDatetime, accelerator: str = "lhc", source: str = "nxcals" + ) -> tuple[str, list]: """ Return last fill name and BeamProcesses. @@ -105,11 +107,12 @@ def find_last_fill( Returns: tuple: Last fill name (str), Beamprocesses of last fill (list). - """ + """ start_time = acc_time.sub(days=1) # assumes a fill is not longer than a day try: fills = self.find_beamprocess_history( - t_start=start_time, t_end=acc_time, + t_start=start_time, + t_end=acc_time, accelerator=accelerator, source=source, ) @@ -121,8 +124,12 @@ def find_last_fill( return last_fill, fills[last_fill] def find_beamprocess_history( - self, t_start: AccDatetime, t_end: AccDatetime, accelerator: str = "lhc", source: str = "nxcals" - ) -> Dict: + self, + t_start: AccDatetime, + t_end: AccDatetime, + accelerator: str = "lhc", + source: str = "nxcals", + ) -> dict: """ Finds the BeamProcesses between t_start and t_end and sorts then by fills. Adapted from pjlsa's FindBeamProcessHistory but with source pass-through @@ -138,7 +145,9 @@ def find_beamprocess_history( Dictionary of fills (keys) with a list of Timestamps and BeamProcesses. """ - cts = self.findUserContextMappingHistory(t_start.timestamp(), t_end.timestamp(), accelerator=accelerator) + cts = self.findUserContextMappingHistory( + t_start.timestamp(), t_end.timestamp(), accelerator=accelerator + ) db = pytimber.LoggingDB(source=source, loglevel=logging.WARNING) fillnts, fillnv = try_to_acquire_data( @@ -146,7 +155,9 @@ def find_beamprocess_history( )["HX:FILLN"] if not len(fillnv): - raise ValueError(f"No beamprocesses for {accelerator} ({source}) found between {t_start} - {t_end}.") + raise ValueError( + f"No beamprocesses for {accelerator} ({source}) found between {t_start} - {t_end}." + ) LOG.debug(f"{len(fillnts)} fills aqcuired.") # map beam-processes to fills @@ -159,9 +170,12 @@ def find_beamprocess_history( return fills def get_trim_history( - self, beamprocess: str, knobs: list, - start_time: AccDatetime = None, end_time: AccDatetime = None, - accelerator: str = "lhc" + self, + beamprocess: str, + knobs: list, + start_time: AccDatetime = None, + end_time: AccDatetime = None, + accelerator: str = "lhc", ) -> dict: """ Get trim history for knobs between specified times. @@ -194,21 +208,26 @@ def get_trim_history( LOG.debug(f"Getting trims for {len(knobs)} knobs.") try: - trims = self.getTrims(parameter=knobs, beamprocess=beamprocess, start=start_time, end=end_time) + trims = self.getTrims( + parameter=knobs, beamprocess=beamprocess, start=start_time, end=end_time + ) except jpype.java.lang.NullPointerException as e: # In the past this happened, when a knob was not defined, but # this should have been caught by the filter_existing_knobs above - raise ValueError(f"Something went wrong when extracting trims for the knobs: {knobs}") from e + raise ValueError( + f"Something went wrong when extracting trims for the knobs: {knobs}" + ) from e LOG.debug(f"{len(trims)} trims extracted.") - trims_not_found = [k for k in knobs if k not in trims.keys()] + trims_not_found = [k for k in knobs if k not in trims] if len(trims_not_found): LOG.warning( f"The following knobs were not found in '{beamprocess}' " - f"or had no trims during the given time: {trims_not_found}") + f"or had no trims during the given time: {trims_not_found}" + ) return trims - def get_beamprocess_info(self, beamprocess: Union[str, object]) -> Dict: + def get_beamprocess_info(self, beamprocess: str | object) -> dict: """ Get context info of the given beamprocess. @@ -226,8 +245,10 @@ def get_beamprocess_info(self, beamprocess: Union[str, object]) -> Dict: return bp_dict def find_active_beamprocess_at_time( - self, acc_time: AccDatetime, accelerator: str = "lhc", - bp_group: str = "POWERCONVERTERS" # the Beamprocesses relevant for OMC, + self, + acc_time: AccDatetime, + accelerator: str = "lhc", + bp_group: str = "POWERCONVERTERS", # the Beamprocesses relevant for OMC, ): """ Find the active beam process at the time given. @@ -250,8 +271,10 @@ def find_active_beamprocess_at_time( ) beamprocess = beamprocessmap.get(bp_group) if beamprocess is None: - raise ValueError(f"No active BeamProcess found for group '{bp_group}' " - f"at time {acc_time.utc_string} UTC.") + raise ValueError( + f"No active BeamProcess found for group '{bp_group}' " + f"at time {acc_time.utc_string} UTC." + ) LOG.debug(f"Active Beamprocess at time '{acc_time.cern_utc_string()}': {str(beamprocess)}") return beamprocess @@ -274,11 +297,11 @@ def get_knob_circuits(self, knob_name: str, optics: str) -> tfs.TfsDataFrame: df.headers[HEAD_INFO] = "In MAD-X it should be 'name = name + DELTA * knobValue'" knob = self._knobService.findKnob(knob_name) if knob is None: - raise IOError(f"Knob '{knob_name}' does not exist") + raise OSError(f"Knob '{knob_name}' does not exist") try: knob_settings = knob.getKnobFactors().getFactorsForOptic(optics) except jpype.java.lang.IllegalArgumentException: - raise IOError(f"Knob '{knob_name}' not available for optics '{optics}'") + raise OSError(f"Knob '{knob_name}' not available for optics '{optics}'") for knob_factor in knob_settings: factor = knob_factor.getFactor() @@ -303,7 +326,9 @@ def get_madx_name_from_circuit(self, circuit: str): slist = jpype.java.util.Collections.singletonList( # python lists did not work (jdilly) logical_name ) - madx_name_map = self._deviceService.findMadStrengthNamesByLogicalNames(slist) # returns a map + madx_name_map = self._deviceService.findMadStrengthNamesByLogicalNames( + slist + ) # returns a map madx_name = madx_name_map[logical_name] LOG.debug(f"Name conversion: {circuit} -> {logical_name} -> {madx_name}") return madx_name @@ -327,7 +352,6 @@ def __getattr__(cls, attr): def hooked(*args, **kwargs): result = client_attr(*args, **kwargs) - result_is_self = False try: if result == cls._client: # prevent client from becoming unwrapped @@ -339,14 +363,14 @@ def hooked(*args, **kwargs): return result return hooked - else: - return client_attr + return client_attr class LSA(metaclass=LSAMeta): """Import this class to use LSA like the client without the need to instantiate it. Disadvantage: It will always use the default Server. """ + pass @@ -355,10 +379,14 @@ class LSA(metaclass=LSAMeta): def _beamprocess_to_dict(bp): """Converts some fields of the beamprocess (java) to a dictionary.""" - bp_dict = {'Name': bp.toString(), "Object": bp} - bp_dict.update({getter[3:]: str(bp.__getattribute__(getter)()) # __getattr__ does not exist - for getter in dir(bp) - if getter.startswith('get') and "Attribute" not in getter}) + bp_dict = {"Name": bp.toString(), "Object": bp} + bp_dict.update( + { + getter[3:]: str(bp.__getattribute__(getter)()) # __getattr__ does not exist + for getter in dir(bp) + if getter.startswith("get") and "Attribute" not in getter + } + ) return bp_dict @@ -373,15 +401,16 @@ def try_to_acquire_data(function: Callable, *args, **kwargs): Returns: Return arguments of ``function`` - """ + """ retries = MAX_RETRIES for tries in range(retries + 1): try: return function(*args, **kwargs) except java.lang.IllegalStateException as e: - raise IOError("Could not acquire data, user probably has no access to NXCALS") from e + raise OSError("Could not acquire data, user probably has no access to NXCALS") from e except JException as e: # Might be a case for retries if "RetryableException" in str(e) and (tries + 1) < retries: LOG.warning(f"Could not acquire data! Trial no {tries + 1} / {retries}") continue # will go to the next iteratoin of the loop, so retry - raise IOError("Could not acquire data!") from e + raise OSError("Could not acquire data!") from e + raise RuntimeError(f"Could not acquire data after {retries:d} retries.") diff --git a/pylhc/data_extract/timber.py b/pylhc/data_extract/timber.py index 1593d33..294de0d 100644 --- a/pylhc/data_extract/timber.py +++ b/pylhc/data_extract/timber.py @@ -4,6 +4,7 @@ This module provides useful functions to conveniently wrap the functionality of ``pytimber``. """ + import logging from omc3.utils.mock import cern_network_import diff --git a/pylhc/forced_da_analysis.py b/pylhc/forced_da_analysis.py index 32017e1..1c3fd90 100644 --- a/pylhc/forced_da_analysis.py +++ b/pylhc/forced_da_analysis.py @@ -87,11 +87,11 @@ .. _CarlierForcedDA2019: https://journals.aps.org/prab/pdf/10.1103/PhysRevAccelBeams.22.031002 """ -import os + +import contextlib from collections import defaultdict from contextlib import suppress from pathlib import Path -from typing import Tuple import matplotlib as mpl import matplotlib.colors as mcolors @@ -105,9 +105,9 @@ import tfs from generic_parser import EntryPointParameters, entrypoint from generic_parser.entry_datatypes import ( - DictAsString, FALSE_ITEMS, TRUE_ITEMS, + DictAsString, get_instance_faker_meta, get_multi_class, ) @@ -124,10 +124,6 @@ from tfs import TfsDataFrame from tfs.tools import significant_digits -pytimber = cern_network_import('pytimber') -PageStore = cern_network_import('pytimber.pagestore.PageStore') - - from pylhc.constants.forced_da_analysis import ( BSRT_EMITTANCE_TO_METER, BWS_DIRECTIONS, @@ -181,6 +177,9 @@ get_proton_gamma, ) +pytimber = cern_network_import("pytimber") +PageStore = cern_network_import("pytimber.pagestore.PageStore") + LOG = logging_tools.get_logger(__name__) @@ -200,14 +199,13 @@ def __new__(cls, value): if value in TRUE_ITEMS: return True - elif value in FALSE_ITEMS: + if value in FALSE_ITEMS: return False - else: - try: - return Path(value) - except TypeError: - return value + try: + return Path(value) + except TypeError: + return value def _get_pathclass(*other_classes): @@ -233,127 +231,136 @@ def __new__(cls, value): def get_params(): return EntryPointParameters( - kick_directory=dict( - flags=["-k", "--kickdir"], - required=True, - type=PathOrString, - help="Analysis kick_directory containing kick files.", - ), - output_directory=dict( - flags=["-o", "--outdir"], - type=PathOrString, - help="Output kick_directory, if not given subfolder in kick kick_directory", - ), - energy=dict( - flags=["-e", "--energy"], - required=True, - type=get_multi_class(float, int), - help="Beam energy in GeV.", - ), - fill=dict( - flags=["-f", "--fill"], - type=get_multi_class(int, type(None)), - help="Fill that was used. If not given, check out time_around_kicks.", - ), - beam=dict( - flags=["-b", "--beam"], required=True, choices=[1, 2], type=int, help="Beam to use." - ), - plane=dict( - flags=["-p", "--plane"], - choices=["X", "Y"], - required=True, - type=str, - help=( + kick_directory={ + "flags": ["-k", "--kickdir"], + "required": True, + "type": PathOrString, + "help": "Analysis kick_directory containing kick files.", + }, + output_directory={ + "flags": ["-o", "--outdir"], + "type": PathOrString, + "help": "Output kick_directory, if not given subfolder in kick kick_directory", + }, + energy={ + "flags": ["-e", "--energy"], + "required": True, + "type": get_multi_class(float, int), + "help": "Beam energy in GeV.", + }, + fill={ + "flags": ["-f", "--fill"], + "type": get_multi_class(int, type(None)), + "help": "Fill that was used. If not given, check out time_around_kicks.", + }, + beam={ + "flags": ["-b", "--beam"], + "required": True, + "choices": [1, 2], + "type": int, + "help": "Beam to use.", + }, + plane={ + "flags": ["-p", "--plane"], + "choices": ["X", "Y"], + "required": True, + "type": str, + "help": ( "Plane of the kicks." # " Give 'XY' for using both planes (e.g. diagonal kicks)." # Future release ), - ), - time_around_kicks=dict( - type=int, - default=TIME_AROUND_KICKS_MIN, - help=( + }, + time_around_kicks={ + "type": int, + "default": TIME_AROUND_KICKS_MIN, + "help": ( "If no fill is given, this defines the time (in minutes) " "when data before the first and after the last kick is extracted." ), - ), - intensity_time_before_kick=dict( - type=int, - nargs=2, - default=TIME_BEFORE_KICK_S, - help=( + }, + intensity_time_before_kick={ + "type": int, + "nargs": 2, + "default": TIME_BEFORE_KICK_S, + "help": ( "Defines the times before the kicks (in seconds) " "which is used for intensity averaging to calculate the losses." ), - ), - intensity_time_after_kick=dict( - type=int, - nargs=2, - default=TIME_AFTER_KICK_S, - help=( + }, + intensity_time_after_kick={ + "type": int, + "nargs": 2, + "default": TIME_AFTER_KICK_S, + "help": ( "Defines the times after the kicks (in seconds) " "which is used for intensity averaging to calculate the losses." ), - ), - normalized_emittance=dict( - type=float, - default=LHC_NOMINAL_EMITTANCE, - help="Assumed NORMALIZED nominal emittance for the machine.", - ), - emittance_tfs=dict( - type=PathOrDataframe, help="Dataframe or Path of pre-saved emittance tfs.", - ), - intensity_tfs=dict( - type=PathOrDataframe, help="Dataframe or Path of pre-saved intensity tfs.", - ), - show_wirescan_emittance=dict( - default=False, - type=BoolOrPathOrDataFrame, - help=( + }, + normalized_emittance={ + "type": float, + "default": LHC_NOMINAL_EMITTANCE, + "help": "Assumed NORMALIZED nominal emittance for the machine.", + }, + emittance_tfs={ + "type": PathOrDataframe, + "help": "Dataframe or Path of pre-saved emittance tfs.", + }, + intensity_tfs={ + "type": PathOrDataframe, + "help": "Dataframe or Path of pre-saved intensity tfs.", + }, + show_wirescan_emittance={ + "default": False, + "type": BoolOrPathOrDataFrame, + "help": ( "Flag if the emittance from wirescan should also be shown, " "can also be a Dataframe or Path of pre-saved emittance bws tfs." ), - ), - timber_db=dict( - type=str, - default="all", - choices=["all", "mdb", "ldb", "nxcals"], - help="Which timber database to use.", - ), - pagestore_db=dict(type=PathOrPagestore, help="(Path to-) presaved timber database"), - fit=dict( - type=str, - default="exponential", - choices=["exponential", "linear"], - help="Fitting function to use (rearranges parameters to make sense).", - ), - emittance_window_length=dict( - help="Length of the moving average window. (# data points)", - type=int, - default=ROLLING_AVERAGE_WINDOW, - ), - emittance_outlier_limit=dict( - help="Limit, i.e. cut from mean, on emittance outliers in meter.", - type=float, - default=OUTLIER_LIMIT, - ), - emittance_type=dict( - type=str, - default="average", - choices=["fit_sigma", "average"], - help="Which BSRT data to use (from database).", - ), - show=dict(action="store_true", help="Show plots.",), - plot_styles=dict( - type=str, - nargs="+", - default=["standard"], - help="Which plotting styles to use, either from omc3 styles or default mpl.", - ), - manual_style=dict( - type=DictAsString, - default={}, - help="Additional style rcParameters which update the set of predefined ones.", - ), + }, + timber_db={ + "type": str, + "default": "all", + "choices": ["all", "mdb", "ldb", "nxcals"], + "help": "Which timber database to use.", + }, + pagestore_db={"type": PathOrPagestore, "help": "(Path to-) presaved timber database"}, + fit={ + "type": str, + "default": "exponential", + "choices": ["exponential", "linear"], + "help": "Fitting function to use (rearranges parameters to make sense).", + }, + emittance_window_length={ + "help": "Length of the moving average window. (# data points)", + "type": int, + "default": ROLLING_AVERAGE_WINDOW, + }, + emittance_outlier_limit={ + "help": "Limit, i.e. cut from mean, on emittance outliers in meter.", + "type": float, + "default": OUTLIER_LIMIT, + }, + emittance_type={ + "type": str, + "default": "average", + "choices": ["fit_sigma", "average"], + "help": "Which BSRT data to use (from database).", + }, + show={ + "action": "store_true", + "help": "Show plots.", + }, + plot_styles={ + "type": str, + "nargs": "+", + "default": ["standard"], + "help": "Which plotting styles to use, either from omc3 styles or default mpl.", + }, + manual_style={ + "type": DictAsString, + "default": {}, + "help": "Additional style rcParameters which update the set of predefined ones.", + }, ) @@ -404,7 +411,7 @@ def main(opt): _write_tfs(out_dir, opt.plane, kick_df, intensity_df, emittance_df, emittance_bws_df) # plotting - figs = dict() + figs = {} register_matplotlib_converters() # for datetime plotting style.set_style(opt.plot_styles, opt.manual_style) figs["emittance"] = _plot_emittances( @@ -453,7 +460,7 @@ def _write_tfs( tfs.write(out_dir / outfile_emittance(plane), emittance_df) if emittance_bws_df is not None: tfs.write(out_dir / outfile_emittance_bws(plane), emittance_bws_df) - except (FileNotFoundError, IOError): + except (OSError, FileNotFoundError): LOG.error(f"Cannot write into directory: {str(out_dir)} ") @@ -505,7 +512,7 @@ def _drop_duplicate_indices(df): def _get_dataframes( kick_times: pd.Index, opt: DotDict -) -> Tuple[TfsDataFrame, TfsDataFrame, TfsDataFrame]: +) -> tuple[TfsDataFrame, TfsDataFrame, TfsDataFrame]: """Gets the intensity and emittance dataframes from either input, files or (timber) database.""" db = _get_db(opt) @@ -546,7 +553,7 @@ def _read_tfs(tfs_file_or_path, timespan): """Read previously gathered data (see :meth:`pylhc.forced_da_analysis._write_tfs`).""" try: tfs_df = tfs.read_tfs(tfs_file_or_path, index=TIME_COLUMN) - except IOError: + except OSError: tfs_df = tfs_file_or_path # hopefully tfs_df.index = _convert_time_index(tfs_df.index) @@ -578,13 +585,12 @@ def _filter_emittance_data(df, planes, window_length, limit): df.headers[HEADER_BSRT_ROLLING_WINDOW] = window_length df.headers[HEADER_BSRT_OUTLIER_LIMIT] = limit df = _maybe_add_sum_for_planes(df, planes, column_norm_emittance) - df = _maybe_add_sum_for_planes( + return _maybe_add_sum_for_planes( df, planes, lambda p: mean_col(column_norm_emittance(p)), lambda p: err_col(mean_col(column_norm_emittance(p))), ) - return df # Timber Data ------------------------------------------------------------------ @@ -604,7 +610,7 @@ def _get_db(opt): LOG.debug(f"Loading database from file {str(db_path)}") db = PageStore(f"file:{str(db_path)}", str(db_path.with_suffix(""))) if opt.fill is not None: - raise EnvironmentError("'fill' can't be used with pagestore database.") + raise OSError("'fill' can't be used with pagestore database.") else: LOG.debug(" Trying to load database from timber.") try: @@ -626,7 +632,7 @@ def _get_db(opt): error_msg += ( "but there is no database given and no access to timber databases. Aborting." ) - raise EnvironmentError(error_msg) + raise OSError(error_msg) return db @@ -652,7 +658,7 @@ def _get_bctrf_beam_intensity_from_timber(beam, db, timespan): def _get_bsrt_bunch_emittances_from_timber(beam, planes, db, timespan, key_type, nominal_emittance): - dfs = {p: None for p in planes} + dfs = dict.fromkeys(planes) for plane in planes: LOG.debug(f"Getting emittance from BSRT for beam {beam} and plane {plane}.") bunch_emittance_key = bsrt_emittance_key(beam, plane, key_type) @@ -671,7 +677,7 @@ def _get_bsrt_bunch_emittances_from_timber(beam, planes, db, timespan, key_type, y_new[f"{x_elem:.3f}"] += y_elem.tolist() # get average and std per timestamp - x = np.array([float(elem) for elem in y_new.keys()]) + x = np.array([float(elem) for elem in y_new]) y = np.array([np.average(elem) for elem in y_new.values()]) * nominal_emittance y_std = np.array([np.std(elem) for elem in y_new.values()]) * nominal_emittance elif key_type == "average": @@ -682,7 +688,9 @@ def _get_bsrt_bunch_emittances_from_timber(beam, planes, db, timespan, key_type, x, y, y_std = x[y != 0], y[y != 0], y_std[y != 0] df = tfs.TfsDataFrame( - index=_timestamp_to_cerntime_index(x), columns=all_columns, dtype=float, + index=_timestamp_to_cerntime_index(x), + columns=all_columns, + dtype=float, ) df[col_nemittance] = y df[err_col(col_nemittance)] = y_std @@ -695,7 +703,7 @@ def _get_bsrt_bunch_emittances_from_timber(beam, planes, db, timespan, key_type, def _get_bws_emittances_from_timber(beam, planes, db, timespan): - dfs = {p: None for p in planes} + dfs = dict.fromkeys(planes) for plane in planes: LOG.debug(f"Getting emittance from BWS for beam {beam} and plane {plane}.") all_columns = [column_bws_norm_emittance(plane, d) for d in BWS_DIRECTIONS] @@ -773,7 +781,7 @@ def _get_old_kick_file(kick_dir, plane): def _get_new_kick_file(kick_dir, planes): """Kick files from ``omc3``.""" - dfs = {p: None for p in planes} + dfs = dict.fromkeys(planes) for plane in planes: path = kick_dir / f"{KICKFILE}_{plane.lower()}{TFS_SUFFIX}" LOG.debug(f"Reading kickfile '{str(path)}'.'") @@ -785,10 +793,7 @@ def _get_new_kick_file(kick_dir, planes): def _get_output_dir(kick_directory, output_directory): kick_path = Path(kick_directory) - if output_directory: - output_path = Path(output_directory) - else: - output_path = kick_path / RESULTS_DIR + output_path = Path(output_directory) if output_directory else kick_path / RESULTS_DIR try: output_path.mkdir(exist_ok=True) except PermissionError: @@ -806,11 +811,10 @@ def _get_output_dir(kick_directory, output_directory): def _add_intensity_and_losses_to_kicks(kick_df, intensity_df, time_before, time_after): LOG.debug("Calculating intensity and losses for the kicks.") col_list = [INTENSITY_BEFORE, INTENSITY_AFTER, INTENSITY_LOSSES] - new_columns = [col for col in col_list + [err_col(c) for c in col_list]] + new_columns = list(col_list + [err_col(c) for c in col_list]) kick_df = kick_df.reindex(columns=kick_df.columns.tolist() + new_columns) kick_df = _get_intensities_around_kicks(kick_df, intensity_df, time_before, time_after) - kick_df = _calculate_intensity_losses_at_kicks(kick_df) - return kick_df + return _calculate_intensity_losses_at_kicks(kick_df) def _get_intensities_around_kicks(kick_df, intensity_df, time_before, time_after): @@ -895,7 +899,7 @@ def fun_exp_decay(p, x): # fit and plot def fun_exp_sigma(p, x): # only used for plotting """p = DA_sigma, x = action (J_sigma)""" - return np.exp(-0.5 * (p ** 2 - x ** 2)) + return np.exp(-0.5 * (p**2 - x**2)) def fun_linear(p, x): # fit and plot @@ -981,7 +985,12 @@ def _fit_odr(fun, x, y, sx, sy, init): """ODR Fit (includes errors).""" # fill zero errors with the minimum error - otherwise fit will not work fit_model_sigma = scipy.odr.Model(fun) - data_model_sigma = scipy.odr.RealData(x=x, y=y, sx=sx, sy=sy,) + data_model_sigma = scipy.odr.RealData( + x=x, + y=y, + sx=sx, + sy=sy, + ) da_odr = scipy.odr.ODR(data_model_sigma, fit_model_sigma, beta0=init) # da_odr.set_job(fit_type=2) odr_output = da_odr.run() @@ -1010,7 +1019,7 @@ def _convert_to_sigmas(plane, kick_df): ) LOG.info( f"Measured Emittance {emittance_sign} ± {emittance_sign_std} pm" - f" (Nominal {nominal_emittance*1e12: .2f} pm)" + f" (Nominal {nominal_emittance * 1e12: .2f} pm)" ) # DA (in units of J) to DA_sigma @@ -1053,9 +1062,7 @@ def _plot_intensity(directory, beam, plane, kick_df, intensity_df): # convert to % relative to before first kick idx_before = get_approximate_index( - intensity_df, - kick_df.index.min() - pd.Timedelta(seconds=x_span[0]), - method="ffill" + intensity_df, kick_df.index.min() - pd.Timedelta(seconds=x_span[0]), method="ffill" ) idx_intensity = intensity_df.columns.get_loc(INTENSITY) # for iloc intensity_start = intensity_df.iloc[idx_before, idx_intensity] @@ -1096,11 +1103,11 @@ def _plot_intensity(directory, beam, plane, kick_df, intensity_df): ax.text( _date2num(kick), 0.5 * sum(normalized_intensity.loc[kick, :]), - " -{:.1f}$\pm${:.1f} %\n".format(*normalized_losses.loc[kick, :]) - + " (-{:.1f}$\pm${:.1f} %)".format(*normalized_losses_kick.loc[kick, :]), + " -{:.1f}$\\pm${:.1f} %\n".format(*normalized_losses.loc[kick, :]) + + r" (-{:.1f}$\pm${:.1f} %)".format(*normalized_losses_kick.loc[kick, :]), va="bottom", color=colors.get_mpl_color(1), - fontdict=dict(fontsize=mpl.rcParams["font.size"] * 0.8), + fontdict={"fontsize": mpl.rcParams["font.size"] * 0.8}, ) _plot_kicks_and_scale_x(ax, kick_df.index, pad=x_span) @@ -1112,7 +1119,7 @@ def _plot_intensity(directory, beam, plane, kick_df, intensity_df): plt.tight_layout() annotations.set_name(f"Intensity Beam {beam}, Plane {plane}", fig) annotations.set_annotation( - f"Intensity at 100%: {intensity_start*1e-10:.3f}" "$\;\cdot\;10^{{10}}$ charges", + rf"Intensity at 100%: {intensity_start * 1e-10:.3f}$\;\cdot\;10^{{{{10}}}}$ charges", ax=ax, position="left", ) @@ -1229,7 +1236,7 @@ def _plot_da_fit(directory, beam, plane, k_df, fit_type): emittance = kick_df[col_emittance] da, da_err = kick_df.headers[header_da(plane)], kick_df.headers[header_da_error(plane)] da_mu, da_err_mu = significant_digits(da * 1e6, da_err * 1e6) - da_label = f"Fit: DA$_J$= ${da_mu} \pm {da_err_mu} \mu m$" + da_label = rf"Fit: DA$_J$= ${da_mu} \pm {da_err_mu} \mu m$" if fit_type == "linear": fit_fun = fun_linear @@ -1245,7 +1252,7 @@ def _plot_da_fit(directory, beam, plane, k_df, fit_type): kick_df.headers[header_da_error(plane, unit="sigma")], ) da_round, da_err_round = significant_digits(da, da_err) - da_label = f"Fit: DA= ${da_round} \pm {da_err_round} N_{{\sigma}}$" + da_label = rf"Fit: DA= ${da_round} \pm {da_err_round} N_{{\sigma}}$" fit_fun = fun_exp_sigma fit_data = action multiplier = 100 # for percentages @@ -1269,7 +1276,7 @@ def _plot_da_fit(directory, beam, plane, k_df, fit_type): da_string = "2DA$_J$" elif fit_type == "norm": da_x = da - da_string = "DA$_\sigma$" + da_string = r"DA$_\sigma$" if action_max < da: if fit_type in ["linear", "exponential"]: @@ -1318,18 +1325,18 @@ def _plot_da_fit(directory, beam, plane, k_df, fit_type): x=0, y=1.00, s=( - f"$\epsilon_{{mean}}$ = {emittance_sign} $\pm$ {emittance_sign_std} pm " - f"($\epsilon_{{nominal}}$ = {nominal_emittance*1e12: .2f} pm)" + rf"$\epsilon_{{mean}}$ = {emittance_sign} $\pm$ {emittance_sign_std} pm " + rf"($\epsilon_{{nominal}}$ = {nominal_emittance * 1e12: .2f} pm)" ), transform=ax.transAxes, va="bottom", ha="left", ) ax.set_xlabel( - f"$N_{{\sigma}} = \sqrt{{2J_{{{plane if len(plane) == 1 else ''}}}/\epsilon}}$" + rf"$N_{{\sigma}} = \sqrt{{2J_{{{plane if len(plane) == 1 else ''}}}/\epsilon}}$" ) else: - ax.set_xlabel(f"$2J_{{{plane if len(plane) == 1 else ''}}} \; [\mu m]$") + ax.set_xlabel(rf"$2J_{{{plane if len(plane) == 1 else ''}}} \; [\mu m]$") if fit_type == "linear": ax.set_ylabel(r"ln($I/I_0$)") @@ -1360,9 +1367,10 @@ def _get_fit_plot_data(da, da_err, data, fit_type): # Helper --- + def get_approximate_index(df, item, method="nearest"): - """ Emulates the `get_loc` from pandas<2.0, i.e. - single index input and output. """ + """Emulates the `get_loc` from pandas<2.0, i.e. + single index input and output.""" return df.index.get_indexer([item], method=method)[0] @@ -1395,7 +1403,10 @@ def _maybe_add_sum_for_planes(df, planes, col_fun, col_err_fun=None): """In case planes == 'XY' add the two plane columns and their errors.""" if len(planes) > 1: if col_err_fun is not None: - cols = lambda p: [col_fun(p), col_err_fun(p)] + + def cols(p): + return [col_fun(p), col_err_fun(p)] + x_cols, y_cols = [cols(p) for p in planes] df = df.reindex(columns=df.columns.to_list() + cols(planes)) df[cols(planes)] = np.array( @@ -1421,20 +1432,18 @@ def _date2num(times): except AttributeError: pass # probably datetime already except TypeError: - try: # not iterable + with contextlib.suppress(AttributeError): times = times.datetime - except AttributeError: - pass # probably datetime already return mdates.date2num(times) def _save_fig(directory, plane, fig, ptype): try: for ftype in PLOT_FILETYPES: - path = os.path.join(directory, outfile_plot(ptype, plane, ftype)) + path = Path(directory) / outfile_plot(ptype, plane, ftype) LOG.debug(f"Saving Figure to {path}") fig.savefig(path) - except IOError: + except OSError: LOG.error(f"Couldn't create output files for {ptype} plots.") diff --git a/pylhc/kickgroups.py b/pylhc/kickgroups.py index 51e80f8..77b88c8 100644 --- a/pylhc/kickgroups.py +++ b/pylhc/kickgroups.py @@ -56,15 +56,14 @@ values for the first ones. A value of zero means showing all files in the group. """ + import argparse import json - from datetime import datetime from pathlib import Path import numpy as np import pandas as pd - from dateutil import tz from omc3.utils import logging_tools from pandas import DataFrame @@ -106,7 +105,9 @@ # List Kickgroups -------------------------------------------------------------- -def list_available_kickgroups(by: str = TIMESTAMP, root: Path | str = KICKGROUPS_ROOT, printout: bool = True) -> DataFrame: +def list_available_kickgroups( + by: str = TIMESTAMP, root: Path | str = KICKGROUPS_ROOT, printout: bool = True +) -> DataFrame: """ List all available KickGroups in `root` with optional sorting.. @@ -174,7 +175,9 @@ def get_kickgroup_info(kick_group: str, root: Path | str = KICKGROUPS_ROOT) -> T LOG.debug(f"Loading info from all KickFiles in KickGroup '{kick_group}'") kick_group_data = _load_json(Path(root) / f"{kick_group}.json") kicks_files = kick_group_data["jsonFiles"] - df_info = TfsDataFrame(index=range(len(kicks_files)), columns=KICK_COLUMNS, headers={KICKGROUP: kick_group}) + df_info = TfsDataFrame( + index=range(len(kicks_files)), columns=KICK_COLUMNS, headers={KICKGROUP: kick_group} + ) if not len(kicks_files): raise ValueError(f"KickGroup {kick_group} contains no kicks.") @@ -232,8 +235,12 @@ def load_kickfile(kickfile: Path | str) -> pd.Series: data[TUNEX] = kick["excitationSettings"][0]["acDipoleSettings"][idx]["measuredTune"] data[TUNEY] = kick["excitationSettings"][0]["acDipoleSettings"][idy]["measuredTune"] - data[DRIVEN_TUNEX] = data[TUNEX] + kick["excitationSettings"][0]["acDipoleSettings"][idx]["deltaTuneStart"] - data[DRIVEN_TUNEY] = data[TUNEY] + kick["excitationSettings"][0]["acDipoleSettings"][idy]["deltaTuneStart"] + data[DRIVEN_TUNEX] = ( + data[TUNEX] + kick["excitationSettings"][0]["acDipoleSettings"][idx]["deltaTuneStart"] + ) + data[DRIVEN_TUNEY] = ( + data[TUNEY] + kick["excitationSettings"][0]["acDipoleSettings"][idy]["deltaTuneStart"] + ) data[DRIVEN_TUNEZ] = kick["excitationData"][0]["rfdata"]["excitationFrequency"] data[AMPX] = kick["excitationSettings"][0]["acDipoleSettings"][idx]["amplitude"] data[AMPY] = kick["excitationSettings"][0]["acDipoleSettings"][idy]["amplitude"] @@ -244,9 +251,9 @@ def load_kickfile(kickfile: Path | str) -> pd.Series: for plane in ["X", "Y"]: tune, driven_tune, amp = entry_map[plane] - data[tune] = np.NaN - data[driven_tune] = np.NaN - data[amp] = np.NaN + data[tune] = np.nan + data[driven_tune] = np.nan + data[amp] = np.nan try: idx = _get_plane_index(kick["excitationSettings"], plane) @@ -254,7 +261,9 @@ def load_kickfile(kickfile: Path | str) -> pd.Series: LOG.warning(f"{str(e)} in {kickfile}") continue - if "measuredTune" not in kick["excitationSettings"][idx]: # Happens in very early files in 2022 + if ( + "measuredTune" not in kick["excitationSettings"][idx] + ): # Happens in very early files in 2022 LOG.warning(f"No measured tune {plane} in the kick file: {kickfile}") continue @@ -262,14 +271,15 @@ def load_kickfile(kickfile: Path | str) -> pd.Series: data[driven_tune] = data[tune] + _get_delta_tune(kick, idx) data[amp] = kick["excitationSettings"][idx]["amplitude"] - data[DRIVEN_TUNEZ] = np.NaN - data[AMPZ] = np.NaN + data[DRIVEN_TUNEZ] = np.nan + data[AMPZ] = np.nan return data + def _get_delta_tune(kick: dict, idx_plane: int) -> float: - """ Return the delta from the tune for the kicks. - For some reason, there are multiple different keys where this can be stored. """ + """Return the delta from the tune for the kicks. + For some reason, there are multiple different keys where this can be stored.""" # Default key for ACDipole --- # There is also "deltaTuneEnd", but we usually don't change the delta during kick @@ -293,8 +303,8 @@ def _get_delta_tune(kick: dict, idx_plane: int) -> float: raise KeyError(f"Could not find delta tune for plane-entry {idx_plane}") -def _find_existing_file_path(path: str|Path) -> Path: - """ Find the existing kick file for the kick group. """ +def _find_existing_file_path(path: str | Path) -> Path: + """Find the existing kick file for the kick group.""" path = Path(path) if path.is_file(): return path @@ -304,7 +314,7 @@ def _find_existing_file_path(path: str|Path) -> Path: if fill_data in path.parts: # Fills are moved at the end of year - idx = path.parts.index(fill_data)+1 + idx = path.parts.index(fill_data) + 1 new_path = Path(*path.parts[:idx], all_fill_data, *path.parts[idx:]) if new_path.exists(): return new_path @@ -312,7 +322,6 @@ def _find_existing_file_path(path: str|Path) -> Path: raise FileNotFoundError(f"Could not find kick file at {path}") - # Functions with console output --- # Full Info - @@ -353,7 +362,9 @@ def _print_kickgroup_info(kicks_info: TfsDataFrame) -> None: # Files only - -def show_kickgroup_files(kick_group: str, nfiles: int = None, root: Path | str = KICKGROUPS_ROOT) -> None: +def show_kickgroup_files( + kick_group: str, nfiles: int = None, root: Path | str = KICKGROUPS_ROOT +) -> None: """ Wrapper around `pylhc.kickgroups.get_kickgroup_info`, gathering the relevant information from all kickfiles in the KickGroup and printing only the sdds-filepaths @@ -454,7 +465,7 @@ def _get_plane_index(data: list[dict], plane: str) -> str: def _get_fill_from_path(sdds_path: str | Path) -> str: - """ Get the fill number from the path to the sdds file. + """Get the fill number from the path to the sdds file. Note: Not sure why the fill is not saved automatically into the .json file. Maybe we should ask OP to include this. """ diff --git a/pylhc/lsa_to_madx.py b/pylhc/lsa_to_madx.py index c4b4fc1..460f9a6 100644 --- a/pylhc/lsa_to_madx.py +++ b/pylhc/lsa_to_madx.py @@ -39,7 +39,7 @@ One should not be surprised if long ``LSA`` knob names appear slightly differently in the created ``MAD-X`` files, then functionality stays intact. For instance, the knob ``LHCBEAM/MD_ATS_2022_05_04_B1_RigidWaitsShift_IP1pos`` will lead to the following trim variable definition: - + .. code-block:: fortran trim_D_ATS_2022_05_04_B1_RigidWaitsShift_IP1pos = 1.0; @@ -90,19 +90,17 @@ --optics R2017aT_A30C30A10mL300_CTPPS2 \\ --file knobs.txt -Hint: the knobs active at a given time can be retrieved with the `~pylhc.machine_settings_info` script. +Hint: the knobs active at a given time can be retrieved with the `~pylhc.machine_settings_info` script. """ + import argparse import re import string - from pathlib import Path -from typing import Dict import numpy as np import pandas as pd import tfs - from omc3.utils import logging_tools from omc3.utils.contexts import timeit @@ -114,7 +112,7 @@ # ----- Helper functions ----- # -def parse_knobs_and_trim_values_from_file(knobs_file: Path) -> Dict[str, float]: +def parse_knobs_and_trim_values_from_file(knobs_file: Path) -> dict[str, float]: """ Parses a file for LSA knobs and their trim values. Each line should be a knob name following by a number of the trim value. If no value is written, it defaults @@ -126,7 +124,9 @@ def parse_knobs_and_trim_values_from_file(knobs_file: Path) -> Dict[str, float]: Returns: A `dict` with as keys the parsed knob names and as values their associated trims. """ - knob_lines = [line for line in Path(knobs_file).read_text().splitlines() if not line.startswith("#")] + knob_lines = [ + line for line in Path(knobs_file).read_text().splitlines() if not line.startswith("#") + ] results = {} for line in knob_lines: @@ -169,9 +169,13 @@ def get_sign_madx_vs_lsa(madx_name: str) -> int: return 1 -def get_madx_script_from_definition_dataframe(deltas_df: tfs.TfsDataFrame, lsa_knob: str, trim: float = 1.0, - by_reference: bool = True, verbose: bool = False - ) -> str: +def get_madx_script_from_definition_dataframe( + deltas_df: tfs.TfsDataFrame, + lsa_knob: str, + trim: float = 1.0, + by_reference: bool = True, + verbose: bool = False, +) -> str: """ Given the extracted definition dataframe of an LSA knob - as returned by `~pylhc.data_extract.lsa.LSAClient.get_knob_circuits` - this function will generate the @@ -207,7 +211,7 @@ def get_madx_script_from_definition_dataframe(deltas_df: tfs.TfsDataFrame, lsa_k # write all inits first (looks nicer in madx) if by_reference: - for variable in deltas.keys(): + for variable in deltas.keys(): # noqa: SIM118 (this is not a dict) variable_init = f"{variable}_init" change_commands.append(f"{variable_init:<17} = {variable};") @@ -218,9 +222,13 @@ def get_madx_script_from_definition_dataframe(deltas_df: tfs.TfsDataFrame, lsa_k # mess up parsing of "var = var + -value" if delta_k is negative if by_reference: variable_init = f"{variable}_init" - change_commands.append(f"{variable:<12} := {variable_init:^19} + ({delta:^25}) * {trim_variable};") + change_commands.append( + f"{variable:<12} := {variable_init:^19} + ({delta:^25}) * {trim_variable};" + ) else: - change_commands.append(f"{variable:<12} = {variable:^15} + ({delta:^25}) * {trim_variable};") + change_commands.append( + f"{variable:<12} = {variable:^15} + ({delta:^25}) * {trim_variable};" + ) change_commands.append(f"! End of change commands for knob: {lsa_knob}\n") return "\n".join(change_commands) @@ -261,7 +269,7 @@ def _get_trim_variable(lsa_knob: str) -> str: def _get_delta(deltas_df: tfs.TfsDataFrame) -> pd.Series: - """ Get the correct delta-column """ + """Get the correct delta-column""" if "DELTA_K" not in deltas_df.columns: LOG.debug("Using DELTA_KL column.") return deltas_df.DELTA_KL @@ -271,12 +279,16 @@ def _get_delta(deltas_df: tfs.TfsDataFrame) -> pd.Series: return deltas_df.DELTA_K if (deltas_df.DELTA_K.astype(bool) & deltas_df.DELTA_KL.astype(bool)).any(): - raise ValueError("Some entries of DELTA_KL and DELTA_K seem to both be given. " - "This looks like a bug. Please investigate.") + raise ValueError( + "Some entries of DELTA_KL and DELTA_K seem to both be given. " + "This looks like a bug. Please investigate." + ) LOG.debug("Both DELTA_K and DELTA_KL columns present, merging columns.") - return pd.Series(np.where(deltas_df.DELTA_K.astype(bool), deltas_df.DELTA_K, deltas_df.DELTA_KL), - index=deltas_df.index) + return pd.Series( + np.where(deltas_df.DELTA_K.astype(bool), deltas_df.DELTA_K, deltas_df.DELTA_KL), + index=deltas_df.index, + ) # ----- Script Part ----- # @@ -292,7 +304,11 @@ def _get_args(): "scripts reproducing the provided knobs." ) parser.add_argument( - "--optics", dest="optics", type=str, required=True, help="The LSA name of the optics for which the knobs are defined." + "--optics", + dest="optics", + type=str, + required=True, + help="The LSA name of the optics for which the knobs are defined.", ) parser.add_argument( "--knobs", @@ -330,7 +346,7 @@ def main(): LOG.info(f"Loading knob names from file '{options.file}'") knobs_dict = parse_knobs_and_trim_values_from_file(Path(options.file)) else: # given at the command line with --knobs, we initialise trim values to 1 - knobs_dict = {knob: 1.0 for knob in options.knobs} + knobs_dict = dict.fromkeys(options.knobs, 1.0) LOG.info("Instantiating LSA client") lsa_client = LSAClient() @@ -340,13 +356,17 @@ def main(): for lsa_knob, trim_value in knobs_dict.items(): LOG.info(f"Processing LSA knob '{lsa_knob}'") try: # next line might raise if knob not defined for the given optics - knob_definition = lsa_client.get_knob_circuits(knob_name=lsa_knob, optics=lsa_optics) + knob_definition = lsa_client.get_knob_circuits( + knob_name=lsa_knob, optics=lsa_optics + ) madx_commands_string = get_madx_script_from_definition_dataframe( deltas_df=knob_definition, lsa_knob=lsa_knob, trim=trim_value ) - except (OSError, IOError): # raised by pjlsa if knob not found - LOG.warning(f"Could not find knob '{lsa_knob}' in the given optics '{lsa_optics}' - skipping") + except OSError: # raised by pjlsa if knob not found + LOG.warning( + f"Could not find knob '{lsa_knob}' in the given optics '{lsa_optics}' - skipping" + ) unfound_knobs.append(lsa_knob) else: # we've managed to find knobs @@ -360,7 +380,10 @@ def main(): Path(madx_file).write_text(madx_commands_string) if unfound_knobs: - LOG.info(f"The following knobs could not be found in the '{lsa_optics}' optics: \n\t\t" + "\n\t\t".join(unfound_knobs)) + LOG.info( + f"The following knobs could not be found in the '{lsa_optics}' optics: \n\t\t" + + "\n\t\t".join(unfound_knobs) + ) if __name__ == "__main__": diff --git a/pylhc/machine_settings_info.py b/pylhc/machine_settings_info.py index 5a2dd5f..891a043 100644 --- a/pylhc/machine_settings_info.py +++ b/pylhc/machine_settings_info.py @@ -7,9 +7,9 @@ If an output path is given, all info will be written into tfs files, otherwise a summary is logged into console. -Knob values can be extracted and the knob definition gathered. +Knob values can be extracted and the knob definition gathered. For brevity reasons, this data is not logged into the summary in the console. -If a start time is given, the trim history for the given knobs can be written out as well. +If a start time is given, the trim history for the given knobs can be written out as well. This data is also not logged. Can be run from command line, parameters as given in :meth:`pylhc.machine_settings_info.get_info`. @@ -47,29 +47,34 @@ :author: jdilly """ + from collections import OrderedDict, namedtuple +from collections.abc import Iterable +from pathlib import Path import tfs from generic_parser import DotDict, EntryPointParameters, entrypoint from generic_parser.entry_datatypes import get_instance_faker_meta +from omc3.knob_extractor import KNOB_CATEGORIES, name2lsa from omc3.utils import logging_tools from omc3.utils.iotools import PathOrStr from omc3.utils.time_tools import AccDatetime, AcceleratorDatetime -from pathlib import Path -from typing import Tuple, Iterable, Dict, Union -from omc3.knob_extractor import name2lsa, KNOB_CATEGORIES from pylhc.constants import machine_settings_info as const -from pylhc.data_extract.lsa import COL_NAME as LSA_COLUMN_NAME, LSA +from pylhc.data_extract.lsa import COL_NAME as LSA_COLUMN_NAME +from pylhc.data_extract.lsa import LSA LOG = logging_tools.get_logger(__name__) class AccDatetimeOrStr(metaclass=get_instance_faker_meta(AccDatetime, str)): """A class that accepts AccDateTime and strings.""" + def __new__(cls, value): if isinstance(value, str): - value = value.strip("\'\"") # behavior like dict-parser, IMPORTANT FOR EVERY STRING-FAKER + value = value.strip( + "'\"" + ) # behavior like dict-parser, IMPORTANT FOR EVERY STRING-FAKER return value @@ -79,140 +84,133 @@ def __new__(cls, value): def _get_params() -> dict: """Parse Commandline Arguments and return them as options.""" return EntryPointParameters( - time=dict( - default=None, - type=AccDatetimeOrStr, - help=("UTC Time as 'Y-m-d H:M:S.f' or ISO format or AccDatetime object." - " Acts as point in time or end time (if ``start_time`` is given).") + time={ + "default": None, + "type": AccDatetimeOrStr, + "help": ( + "UTC Time as 'Y-m-d H:M:S.f' or ISO format or AccDatetime object." + " Acts as point in time or end time (if ``start_time`` is given)." + ), + }, + start_time={ + "default": None, + "type": AccDatetimeOrStr, + "help": ( + "UTC Time as 'Y-m-d H:M:S.f' or ISO format or AccDatetime object." + " Defines the beginning of the time-range." ), - start_time=dict( - default=None, - type=AccDatetimeOrStr, - help=("UTC Time as 'Y-m-d H:M:S.f' or ISO format or AccDatetime object." - " Defines the beginning of the time-range.") - ), - knobs=dict( - default=None, - nargs="+", - type=str, - help="List of knobnames. " - "If `None` (or omitted) no knobs will be extracted. " - "If it is just the string ``'all'``, " - "all knobs will be extracted (can be slow). " - "Use the string ``'default'`` for pre-defined knobs of interest." - ), - accel=dict( - default='lhc', - type=str, - help="Accelerator name."), - beamprocess=dict( - type=str, - help=("Manual override for the Beamprocess " - "(otherwise taken at the given ``time``)") - ), - output_dir=dict( - default=None, - type=PathOrStr, - help="Output directory."), - knob_definitions=dict( - action="store_true", - help="Set to extract knob definitions."), - source=dict( - type=str, - default="nxcals", - help="Source to extract data from."), - log=dict( - action="store_true", - help="Write summary into log (automatically done if no output path is given)."), + }, + knobs={ + "default": None, + "nargs": "+", + "type": str, + "help": "List of knobnames. " + "If `None` (or omitted) no knobs will be extracted. " + "If it is just the string ``'all'``, " + "all knobs will be extracted (can be slow). " + "Use the string ``'default'`` for pre-defined knobs of interest.", + }, + accel={"default": "lhc", "type": str, "help": "Accelerator name."}, + beamprocess={ + "type": str, + "help": ("Manual override for the Beamprocess (otherwise taken at the given ``time``)"), + }, + output_dir={"default": None, "type": PathOrStr, "help": "Output directory."}, + knob_definitions={"action": "store_true", "help": "Set to extract knob definitions."}, + source={"type": str, "default": "nxcals", "help": "Source to extract data from."}, + log={ + "action": "store_true", + "help": "Write summary into log (automatically done if no output path is given).", + }, ) @entrypoint(_get_params(), strict=True) -def get_info(opt) -> Dict[str, object]: +def get_info(opt) -> dict[str, object]: """ - Get info about **Beamprocess**, **Optics** and **Knobs** at given time. + Get info about **Beamprocess**, **Optics** and **Knobs** at given time. - Keyword Args: + Keyword Args: - *--Optional--* + *--Optional--* - - **accel** *(str)*: + - **accel** *(str)*: - Accelerator name. + Accelerator name. - default: ``lhc`` + default: ``lhc`` - - **beamprocess** *(str)*: + - **beamprocess** *(str)*: - Manual override for the Beamprocess - (otherwise taken at the given ``time``) + Manual override for the Beamprocess + (otherwise taken at the given ``time``) - default: ``None`` + default: ``None`` - - **knob_definitions**: + - **knob_definitions**: - Set to extract knob definitions. + Set to extract knob definitions. - action: ``store_true`` + action: ``store_true`` - - **knobs** *(str)*: + - **knobs** *(str)*: - List of knobnames. - If `None` (or omitted) no knobs will be extracted. - If it is just the string ``'all'``, - all knobs will be extracted (can be slow). - Use the string ``'default'`` for pre-defined knobs of interest. - If this is called from python, the strings need - to be put as single items into a list. + List of knobnames. + If `None` (or omitted) no knobs will be extracted. + If it is just the string ``'all'``, + all knobs will be extracted (can be slow). + Use the string ``'default'`` for pre-defined knobs of interest. + If this is called from python, the strings need + to be put as single items into a list. - default: ``None`` + default: ``None`` - - **log**: + - **log**: - Write summary into log (automatically done if no output path is - given). + Write summary into log (automatically done if no output path is + given). - action: ``store_true`` + action: ``store_true`` - - **output_dir** *(PathOrStr)*: + - **output_dir** *(PathOrStr)*: - Output directory. + Output directory. - default: ``None`` + default: ``None`` - - **source** *(str)*: + - **source** *(str)*: - Source to extract data from. + Source to extract data from. - default: ``nxcals`` + default: ``nxcals`` - - **start_time** *(AccDatetime, str)*: + - **start_time** *(AccDatetime, str)*: - UTC Time as 'Y-m-d H:M:S.f' format or AccDatetime object. - Defines the beginning of the time-range. + UTC Time as 'Y-m-d H:M:S.f' format or AccDatetime object. + Defines the beginning of the time-range. - default: ``None`` + default: ``None`` - - **time** *(AccDatetime, str)*: + - **time** *(AccDatetime, str)*: - UTC Time as 'Y-m-d H:M:S.f' format or AccDatetime object. - Acts as point in time or end time (if ``start_time`` is given). + UTC Time as 'Y-m-d H:M:S.f' format or AccDatetime object. + Acts as point in time or end time (if ``start_time`` is given). - default: ``None`` + default: ``None`` - Returns: - dict: Dictionary containing the given ``time`` and ``start_time``, - the extracted ``beamprocess``-info and ``optics``-info, the - ``trim_histories`` and current (i.e. at given ``time``) ``trims`` - and the ``knob_definitions``, if extracted. + Returns: + dict: Dictionary containing the given ``time`` and ``start_time``, + the extracted ``beamprocess``-info and ``optics``-info, the + ``trim_histories`` and current (i.e. at given ``time``) ``trims`` + and the ``knob_definitions``, if extracted. """ if opt.output_dir is None: @@ -228,17 +226,19 @@ def get_info(opt) -> Dict[str, object]: LOG.error(str(e)) else: if opt.knobs is not None: - if len(opt.knobs) == 1 and opt.knobs[0].lower() == 'all': + if len(opt.knobs) == 1 and opt.knobs[0].lower() == "all": opt.knobs = [] # will extract all knobs in get_trim_history - if len(opt.knobs) == 1 and opt.knobs[0].lower() == 'default': - opt.knobs = [name2lsa(knob) for category in KNOB_CATEGORIES.values() - for knob in category] + if len(opt.knobs) == 1 and opt.knobs[0].lower() == "default": + opt.knobs = [ + name2lsa(knob) for category in KNOB_CATEGORIES.values() for knob in category + ] trim_histories = LSA.get_trim_history( - beamprocess_info.Object, opt.knobs, + beamprocess_info.Object, + opt.knobs, start_time=acc_start_time, end_time=acc_time, - accelerator=opt.accel + accelerator=opt.accel, ) trims = _get_last_trim(trim_histories) @@ -255,9 +255,17 @@ def get_info(opt) -> Dict[str, object]: out_path = Path(opt.output_dir) out_path.mkdir(parents=True, exist_ok=True) write_summary(out_path, opt.accel, acc_time, beamprocess_info, optics_info, trims) - + if trim_histories and acc_start_time: - write_trim_histories(out_path, trim_histories, opt.accel, acc_time, acc_start_time, beamprocess_info, optics_info) + write_trim_histories( + out_path, + trim_histories, + opt.accel, + acc_time, + acc_start_time, + beamprocess_info, + optics_info, + ) if knob_definitions: write_knob_defitions(out_path, knob_definitions) @@ -269,15 +277,19 @@ def get_info(opt) -> Dict[str, object]: "optics": optics_info, "trim_histories": trim_histories, "trims": trims, - "knob_definitions": knob_definitions + "knob_definitions": knob_definitions, } # Output ####################################################################### -def log_summary(acc_time: AccDatetime, bp_info: DotDict, - optics_info: DotDict = None, trims: Dict[str, float] = None): +def log_summary( + acc_time: AccDatetime, + bp_info: DotDict, + optics_info: DotDict = None, + trims: dict[str, float] = None, +): """Log the summary. Args: @@ -311,8 +323,12 @@ def log_summary(acc_time: AccDatetime, bp_info: DotDict, def write_summary( - output_path: Path, accel: str, acc_time: AccDatetime, bp_info: DotDict, - optics_info: DotDict = None, trims: Dict[str, float] = None + output_path: Path, + accel: str, + acc_time: AccDatetime, + bp_info: DotDict, + optics_info: DotDict = None, + trims: dict[str, float] = None, ): """Write summary into a ``tfs`` file. @@ -328,21 +344,27 @@ def write_summary( trims = trims.items() info_tfs = tfs.TfsDataFrame(trims, columns=[const.column_knob, const.column_value]) - info_tfs.headers = OrderedDict([ - ("Hint:", "All times given in UTC."), - (const.head_accel, accel), - (const.head_time, acc_time.cern_utc_string()), - (const.head_beamprocess, bp_info.Name), - (const.head_fill, bp_info.Fill), - (const.head_beamprocess_start, bp_info.StartTime.cern_utc_string()), - (const.head_context_category, bp_info.ContextCategory), - (const.head_beamprcess_description, bp_info.Description), - ]) + info_tfs.headers = OrderedDict( + [ + ("Hint:", "All times given in UTC."), + (const.head_accel, accel), + (const.head_time, acc_time.cern_utc_string()), + (const.head_beamprocess, bp_info.Name), + (const.head_fill, bp_info.Fill), + (const.head_beamprocess_start, bp_info.StartTime.cern_utc_string()), + (const.head_context_category, bp_info.ContextCategory), + (const.head_beamprcess_description, bp_info.Description), + ] + ) if optics_info is not None: - info_tfs.headers.update(OrderedDict([ - (const.head_optics, optics_info.Name), - (const.head_optics_start, optics_info.StartTime.cern_utc_string()), - ])) + info_tfs.headers.update( + OrderedDict( + [ + (const.head_optics, optics_info.Name), + (const.head_optics_start, optics_info.StartTime.cern_utc_string()), + ] + ) + ) tfs.write(output_path / const.info_name, info_tfs) @@ -354,11 +376,15 @@ def write_knob_defitions(output_path: Path, definitions: dict): def write_trim_histories( - output_path: Path, trim_histories: Dict[str, namedtuple], accel: str, - acc_time: AccDatetime = None, acc_start_time: AccDatetime = None, - bp_info: DotDict = None, optics_info: DotDict = None + output_path: Path, + trim_histories: dict[str, namedtuple], + accel: str, + acc_time: AccDatetime = None, + acc_start_time: AccDatetime = None, + bp_info: DotDict = None, + optics_info: DotDict = None, ): - """ Write the trim histories into tfs files. + """Write the trim histories into tfs files. There are two time columns, one with timestamps as they are usually easier to handle and one with the UTC-string, as they are more human-readable. @@ -371,31 +397,33 @@ def write_trim_histories( bp_info (DotDict): BeamProcess Info Dictionary optics_info (DotDict): Optics Info Dictionary """ - AccDT = AcceleratorDatetime[accel] + AccDT = AcceleratorDatetime[accel] # noqa: N806 # Create headers with basic info --- - headers = OrderedDict([("Hint:", "All times are given in UTC."), - (const.head_accel, accel) - ]) - + headers = OrderedDict([("Hint:", "All times are given in UTC."), (const.head_accel, accel)]) + if acc_start_time: headers.update({const.head_start_time: acc_start_time.cern_utc_string()}) if acc_time: headers.update({const.head_end_time: acc_time.cern_utc_string()}) - + if bp_info: - headers.update({ - const.head_beamprocess: bp_info.Name, - const.head_fill: bp_info.Fill, - }) + headers.update( + { + const.head_beamprocess: bp_info.Name, + const.head_fill: bp_info.Fill, + } + ) if optics_info: headers.update({const.head_optics: optics_info.Name}) # Write trim history per knob ---- for knob, trim_history in trim_histories.items(): - trims_tfs = tfs.TfsDataFrame(headers=headers, columns=[const.column_time, const.column_timestamp, const.column_value]) + trims_tfs = tfs.TfsDataFrame( + headers=headers, columns=[const.column_time, const.column_timestamp, const.column_value] + ) for timestamp, value in zip(trim_history.time, trim_history.data): time = AccDT.from_timestamp(timestamp).cern_utc_string() try: @@ -416,7 +444,9 @@ def write_trim_histories( # Beamprocess ################################################################## -def _get_beamprocess(acc_time: AccDatetime, accel: str, source: str, beamprocess: str = None) -> DotDict: +def _get_beamprocess( + acc_time: AccDatetime, accel: str, source: str, beamprocess: str = None +) -> DotDict: """Get the info about the active beamprocess at ``acc_time`` or the given one.""" if beamprocess is None: beamprocess = LSA.find_active_beamprocess_at_time(acc_time, accelerator=accel) @@ -438,7 +468,9 @@ def _get_beamprocess(acc_time: AccDatetime, accel: str, source: str, beamprocess return DotDict(bp_info) -def _get_beamprocess_start(beamprocesses: Iterable[Tuple[float, str]], acc_time: AccDatetime, bp_name: str) -> AccDatetime: +def _get_beamprocess_start( + beamprocesses: Iterable[tuple[float, str]], acc_time: AccDatetime, bp_name: str +) -> AccDatetime: """ Get the last beamprocess in the list of beamprocesses before dt_utc. Returns the start time of the beam-process in utc. @@ -449,9 +481,7 @@ def _get_beamprocess_start(beamprocesses: Iterable[Tuple[float, str]], acc_time: if time <= ts and name == bp_name: LOG.debug(f"Found start for beamprocess '{bp_name}' at timestamp {time}.") return acc_time.__class__.from_timestamp(time) - raise ValueError( - f"Beamprocess '{bp_name}' was not found." - ) + raise ValueError(f"Beamprocess '{bp_name}' was not found.") # Optics ####################################################################### @@ -468,7 +498,7 @@ def _get_optics(acc_time: AccDatetime, beamprocess: str, bp_start: AccDatetime) def _get_last_optics( optics_table, bp: str, bp_start: AccDatetime, acc_time: AccDatetime -) -> (str, AccDatetime): +) -> tuple[str, AccDatetime]: """Get the name of the optics at the right time for current beam process.""" ts = acc_time.timestamp() - bp_start.timestamp() item = None @@ -490,7 +520,7 @@ def _get_knob_definitions(knobs: list, optics: str): for knob in knobs: try: defs[knob] = LSA.get_knob_circuits(knob, optics) - except IOError as e: + except OSError as e: LOG.warning(e.args[0]) return defs @@ -505,7 +535,7 @@ def _get_last_trim(trims: dict) -> dict: Dictionary of knob names and their values. """ LOG.debug("Extracting last trim from found trim histories.") - trim_dict = {trim: trims[trim].data[-1] for trim in trims.keys()} # return last set value + trim_dict = {trim: trims[trim].data[-1] for trim in trims} # return last set value for trim, value in trim_dict.items(): try: trim_dict[trim] = value.flatten()[-1] # the very last entry ... @@ -519,8 +549,8 @@ def _get_last_trim(trims: dict) -> dict: # Other ######################################################################## -def _get_times(time: Union[str, AccDatetime], start_time: Union[str, AccDatetime], accel: str): - """ Returns acc_time and acc_start_time parameters depending on the user input. """ +def _get_times(time: str | AccDatetime, start_time: str | AccDatetime, accel: str): + """Returns acc_time and acc_start_time parameters depending on the user input.""" acc_dt = AcceleratorDatetime[accel] def get_time(t, default=None): diff --git a/pyproject.toml b/pyproject.toml index 2a62fe5..cde78c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,3 +92,56 @@ markers = [ # Helpful for pytest-debugging (leave commented out on commit): # log_cli=true # log_level=DEBUG + +# ----- Dev Tools Configuration ----- # + +[tool.ruff] +exclude = [ + ".eggs", + ".git", + ".mypy_cache", + ".venv", + "_build", + "build", + "dist", +] + +# Assume Python 3.10+ +target-version = "py310" + +line-length = 100 +indent-width = 4 + +[tool.ruff.lint] +# Allow unused variables when underscore-prefixed. +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" +ignore = [ + "E501", # line too long + "FBT001", # boolean-type-hint-positional-argument + "FBT002", # boolean-default-value-positional-argument + "PT019", # pytest-fixture-param-without-value (but suggested solution fails) +] +extend-select = [ + "F", # Pyflakes rules + "W", # PyCodeStyle warnings + "E", # PyCodeStyle errors + "I", # Sort imports properly + "A", # Detect shadowed builtins + "N", # enforce naming conventions, e.g. ClassName vs function_name + "UP", # Warn if certain things can changed due to newer Python versions + "C4", # Catch incorrect use of comprehensions, dict, list, etc + "FA", # Enforce from __future__ import annotations + "FBT", # detect boolean traps + "ISC", # Good use of string concatenation + "BLE", # disallow catch-all exceptions + "ICN", # Use common import conventions + "RET", # Good return practices + "SIM", # Common simplification rules + "TID", # Some good import practices + "TC", # Enforce importing certain types in a TYPE_CHECKING block + "PTH", # Use pathlib instead of os.path + "NPY", # Some numpy-specific things +] +# Allow fix for all enabled rules (when `--fix`) is provided. +fixable = ["ALL"] +unfixable = [] diff --git a/tests/unit/test_bpm_calibration.py b/tests/unit/test_bpm_calibration.py index 673df7f..533ce09 100644 --- a/tests/unit/test_bpm_calibration.py +++ b/tests/unit/test_bpm_calibration.py @@ -1,37 +1,36 @@ +import pathlib from pathlib import Path + import numpy as np import pandas as pd -import pathlib import pytest -from pandas.testing import assert_series_equal, assert_frame_equal - import tfs from generic_parser.dict_parser import ArgumentError +from pandas.testing import assert_frame_equal, assert_series_equal + from pylhc import bpm_calibration as calibration from pylhc.constants.calibration import BPMS -INPUTS_DIR = Path(__file__).parent.parent / 'inputs' / 'calibration' -MEASUREMENTS = INPUTS_DIR / 'measurements' -EXPECTED_OUTPUT = INPUTS_DIR / 'output' +INPUTS_DIR = Path(__file__).parent.parent / "inputs" / "calibration" +MEASUREMENTS = INPUTS_DIR / "measurements" +EXPECTED_OUTPUT = INPUTS_DIR / "output" def test_calibration_same_betabeat(tmp_path): - factors = calibration.main(inputdir=MEASUREMENTS / 'for_beta', - outputdir=tmp_path, - ips=[1, 4, 5]) + calibration.main(inputdir=MEASUREMENTS / "for_beta", outputdir=tmp_path, ips=[1, 4, 5]) # Let's open the tfs files we just created - x_tfs = tfs.read(tmp_path / 'calibration_beta_x.tfs', index='NAME') - y_tfs = tfs.read(tmp_path / 'calibration_beta_y.tfs', index='NAME') + x_tfs = tfs.read(tmp_path / "calibration_beta_x.tfs", index="NAME") + y_tfs = tfs.read(tmp_path / "calibration_beta_y.tfs", index="NAME") # Those tfs need to be filtered because GetLLM only gives us the BPMs # used in ballistic optics x_tfs = x_tfs.reindex(BPMS[1][1] + BPMS[4][1] + BPMS[5][1]) y_tfs = y_tfs.reindex(BPMS[1][1] + BPMS[4][1] + BPMS[5][1]) - + # And the ones created by BetaBeat.src for the same measurements - expected_x_tfs = tfs.read(EXPECTED_OUTPUT / 'calibration_beta_x.tfs', index='NAME') - expected_y_tfs = tfs.read(EXPECTED_OUTPUT / 'calibration_beta_y.tfs', index='NAME') + expected_x_tfs = tfs.read(EXPECTED_OUTPUT / "calibration_beta_x.tfs", index="NAME") + expected_y_tfs = tfs.read(EXPECTED_OUTPUT / "calibration_beta_y.tfs", index="NAME") # BetaBeat's tfs implementation is a bit different, we don't have the # same integer precision @@ -41,7 +40,7 @@ def test_calibration_same_betabeat(tmp_path): tfs_ = [x_tfs, y_tfs, expected_x_tfs, expected_y_tfs] for i in range(len(tfs_)): tfs_[i] = tfs_[i].drop("ERROR_CALIBRATION_FIT", axis=1) - + # Compare the two dataframes assert_frame_equal(tfs_[0], tfs_[2], rtol=precision) assert_frame_equal(tfs_[1], tfs_[3], rtol=precision) @@ -49,18 +48,14 @@ def test_calibration_same_betabeat(tmp_path): def test_bad_args(): with pytest.raises(ArgumentError) as e: - calibration.main(inputdir='wat', - outputdir='', - ips=[1,5]) + calibration.main(inputdir="wat", outputdir="", ips=[1, 5]) assert "inputdir' is not of type Path" in str(e.value) def test_no_beta_tfs(tmp_path): with pytest.raises(FileNotFoundError) as e: - calibration.main(inputdir=pathlib.Path('wat'), - outputdir=tmp_path, - ips=[1,5]) + calibration.main(inputdir=pathlib.Path("wat"), outputdir=tmp_path, ips=[1, 5]) assert "No such file or directory:" in str(e.value) assert "beta_phase_x.tfs" in str(e.value) @@ -68,101 +63,100 @@ def test_no_beta_tfs(tmp_path): def test_wrong_ip(tmp_path): with pytest.raises(ArgumentError) as e: - calibration.main(inputdir=MEASUREMENTS / 'for_beta', - outputdir=tmp_path, - ips=[15, 22]) + calibration.main(inputdir=MEASUREMENTS / "for_beta", outputdir=tmp_path, ips=[15, 22]) err = "All elements of 'ips' need to be one of '[1, 4, 5]', instead the list was [15, 22]." assert err in str(e.value) def test_calibration_same_dispersion(tmp_path): - factors = calibration.main(inputdir=MEASUREMENTS / 'for_dispersion', - outputdir=tmp_path, - method='dispersion', - ips=[1,5]) + calibration.main( + inputdir=MEASUREMENTS / "for_dispersion", + outputdir=tmp_path, + method="dispersion", + ips=[1, 5], + ) # Let's open the tfs files we just created - x_tfs = tfs.read(tmp_path / 'calibration_dispersion_x.tfs') - + x_tfs = tfs.read(tmp_path / "calibration_dispersion_x.tfs") + # And the ones created by BetaBeat.src for the same measurements - expected_x_tfs = tfs.read(EXPECTED_OUTPUT / 'calibration_dispersion_x.tfs') + expected_x_tfs = tfs.read(EXPECTED_OUTPUT / "calibration_dispersion_x.tfs") - # Check all the BPMs are indeed the same - assert x_tfs['NAME'].equals(expected_x_tfs['NAME']) + # Check all the BPMs are indeed the same + assert x_tfs["NAME"].equals(expected_x_tfs["NAME"]) precision = 1e-4 # BBsrc was wrong for the calibration error fit and the calibration fits # So we can only check the first column: CALIBRATION - assert_series_equal(x_tfs['CALIBRATION'], expected_x_tfs['CALIBRATION'], atol=precision) + assert_series_equal(x_tfs["CALIBRATION"], expected_x_tfs["CALIBRATION"], atol=precision) def test_beta_equal(tmp_path): - factors = calibration.main(inputdir=MEASUREMENTS / 'same_beta', - outputdir=tmp_path, - method='beta') + factors = calibration.main( + inputdir=MEASUREMENTS / "same_beta", outputdir=tmp_path, method="beta" + ) # beta from phase and beta amp are the same. Calibrations factors should # equal to 1 - expected = np.array([1.0] * len(factors['X']['CALIBRATION'])) - assert (factors['X']['CALIBRATION'].to_numpy() == expected).all() - - expected = np.array([1.0] * len(factors['Y']['CALIBRATION'])) - assert (factors['Y']['CALIBRATION'].to_numpy() == expected).all() + expected = np.array([1.0] * len(factors["X"]["CALIBRATION"])) + assert (factors["X"]["CALIBRATION"].to_numpy() == expected).all() + + expected = np.array([1.0] * len(factors["Y"]["CALIBRATION"])) + assert (factors["Y"]["CALIBRATION"].to_numpy() == expected).all() def test_missing_bpms(tmp_path): - calibration.main(inputdir=MEASUREMENTS / 'missing_bpms', - outputdir=tmp_path, - method='beta', - ips=[1,5]) + calibration.main( + inputdir=MEASUREMENTS / "missing_bpms", outputdir=tmp_path, method="beta", ips=[1, 5] + ) factors = tfs.read(tmp_path / "calibration_beta_x.tfs", index="NAME") assert factors.loc["BPMWB.4R1.B1"]["CALIBRATION"] == 1 assert factors.loc["BPMWB.4L1.B1"]["CALIBRATION"] == 1 assert factors.loc["BPMS.2L1.B1"]["CALIBRATION"] != 1 - + def test_number_in_out(tmp_path): - tfs_in = tfs.read(MEASUREMENTS / 'for_beta' / 'beta_phase_x.tfs') - factors = calibration.main(inputdir=MEASUREMENTS / 'for_beta', - outputdir=tmp_path, - method='beta') + tfs_in = tfs.read(MEASUREMENTS / "for_beta" / "beta_phase_x.tfs") + factors = calibration.main( + inputdir=MEASUREMENTS / "for_beta", outputdir=tmp_path, method="beta" + ) assert len(factors["X"]) == len(tfs_in) def test_no_error_tracking(tmp_path): # Test with tracking data on ballistic optics at IR4 without noise - factors = calibration.main(inputdir=MEASUREMENTS / 'tracking', - outputdir=tmp_path, - ips=[4]) - - x_df = factors['X'].reset_index(drop=True) - y_df = factors['Y'].reset_index(drop=True) - ir4_x_df = factors['X'].reindex(BPMS[4][1]).reset_index(drop=True) - ir4_y_df = factors['X'].reindex(BPMS[4][1]).reset_index(drop=True) + factors = calibration.main(inputdir=MEASUREMENTS / "tracking", outputdir=tmp_path, ips=[4]) + + x_df = factors["X"].reset_index(drop=True) + y_df = factors["Y"].reset_index(drop=True) + ir4_x_df = factors["X"].reindex(BPMS[4][1]).reset_index(drop=True) + ir4_y_df = factors["X"].reindex(BPMS[4][1]).reset_index(drop=True) precision = 1e-3 - + # All factors ≃ 1 - expected = pd.Series([1.0] * len(factors['X']['CALIBRATION'])) - assert_series_equal(x_df['CALIBRATION'], expected, atol=precision, check_names=False) - assert_series_equal(y_df['CALIBRATION'], expected, atol=precision, check_names=False) - + expected = pd.Series([1.0] * len(factors["X"]["CALIBRATION"])) + assert_series_equal(x_df["CALIBRATION"], expected, atol=precision, check_names=False) + assert_series_equal(y_df["CALIBRATION"], expected, atol=precision, check_names=False) + # And their error ≃ - expected = pd.Series([0.0] * len(factors['X']['CALIBRATION'])) - assert_series_equal(x_df['ERROR_CALIBRATION'], expected, atol=precision, check_names=False) - assert_series_equal(y_df['ERROR_CALIBRATION'], expected, atol=precision, check_names=False) + expected = pd.Series([0.0] * len(factors["X"]["CALIBRATION"])) + assert_series_equal(x_df["ERROR_CALIBRATION"], expected, atol=precision, check_names=False) + assert_series_equal(y_df["ERROR_CALIBRATION"], expected, atol=precision, check_names=False) # Same with fit - expected = pd.Series([1.0] * len(ir4_x_df['CALIBRATION_FIT'])) - assert_series_equal(ir4_x_df['CALIBRATION_FIT'], expected, atol=precision, check_names=False) - assert_series_equal(ir4_y_df['CALIBRATION_FIT'], expected, atol=precision, check_names=False) - - # and its errors - expected = pd.Series([0.0] * len(ir4_x_df['ERROR_CALIBRATION_FIT'])) - assert_series_equal(ir4_x_df['ERROR_CALIBRATION_FIT'], expected, atol=precision, check_names=False) - assert_series_equal(ir4_y_df['ERROR_CALIBRATION_FIT'], expected, atol=precision, check_names=False) - + expected = pd.Series([1.0] * len(ir4_x_df["CALIBRATION_FIT"])) + assert_series_equal(ir4_x_df["CALIBRATION_FIT"], expected, atol=precision, check_names=False) + assert_series_equal(ir4_y_df["CALIBRATION_FIT"], expected, atol=precision, check_names=False) + # and its errors + expected = pd.Series([0.0] * len(ir4_x_df["ERROR_CALIBRATION_FIT"])) + assert_series_equal( + ir4_x_df["ERROR_CALIBRATION_FIT"], expected, atol=precision, check_names=False + ) + assert_series_equal( + ir4_y_df["ERROR_CALIBRATION_FIT"], expected, atol=precision, check_names=False + ) diff --git a/tests/unit/test_bsrt_analysis.py b/tests/unit/test_bsrt_analysis.py index 0d93f21..1d3d62c 100644 --- a/tests/unit/test_bsrt_analysis.py +++ b/tests/unit/test_bsrt_analysis.py @@ -1,8 +1,7 @@ from ast import literal_eval from pathlib import Path -import sys -import matplotlib +import matplotlib as mpl import numpy as np import pandas as pd import pytest @@ -11,18 +10,20 @@ from pylhc import bsrt_analysis # Forcing non-interactive Agg backend so rendering is done similarly across platforms during tests -matplotlib.use("Agg") +mpl.use("Agg") INPUTS_DIR = Path(__file__).parent.parent / "inputs" BSRT_INPUTS = INPUTS_DIR / "bsrt_analysis" BASELINE_DIR = str(INPUTS_DIR / "mpl_bsrt_baseline") + def test_bsrt_df(_bsrt_df): results = bsrt_analysis.main(directory=str(BSRT_INPUTS), beam="B1") assert_frame_equal( results["bsrt_df"].sort_index(axis=1), _bsrt_df.copy().sort_index(axis=1), - check_dtype=False, check_index_type=False + check_dtype=False, + check_index_type=False, ) diff --git a/tests/unit/test_forced_da_analysis.py b/tests/unit/test_forced_da_analysis.py index 7d221f2..034bc53 100644 --- a/tests/unit/test_forced_da_analysis.py +++ b/tests/unit/test_forced_da_analysis.py @@ -1,12 +1,12 @@ from pathlib import Path -import matplotlib +import matplotlib as mpl import pytest from pylhc.forced_da_analysis import main as fda_analysis # Forcing non-interactive Agg backend so rendering is done similarly across platforms during tests -matplotlib.use("Agg") +mpl.use("Agg") INPUT = Path(__file__).parent.parent / "inputs" @@ -53,8 +53,7 @@ def test_md3312_no_data_given(self, tmp_path): output_directory=tmp_path, ) - - def test_md2162_timberdb(tmp_path): + def test_md2162_timberdb(self, tmp_path): data_dir = INPUT / "kicks_horizontal_md2162" fda_analysis( fit="linear", @@ -73,6 +72,7 @@ def test_md2162_timberdb(tmp_path): # Helper ----------------------------------------------------------------------- + def check_output(output_dir: Path) -> None: assert len(list(output_dir.glob("*.pdf"))) == 5 assert len(list(output_dir.glob("*.tfs"))) == 4 diff --git a/tests/unit/test_lsa_to_madx.py b/tests/unit/test_lsa_to_madx.py index 0900320..1de4c6b 100644 --- a/tests/unit/test_lsa_to_madx.py +++ b/tests/unit/test_lsa_to_madx.py @@ -1,19 +1,15 @@ import json - from pathlib import Path -from typing import Dict -import numpy as np import pytest import tfs - from pandas._testing import assert_dict_equal from pylhc.lsa_to_madx import ( + _get_delta, _get_trim_variable, get_madx_script_from_definition_dataframe, parse_knobs_and_trim_values_from_file, - _get_delta, ) INPUTS_DIR = Path(__file__).parent.parent / "inputs" @@ -28,32 +24,39 @@ def test_parse_knob_definition_file(self, knobs_file, parsed_definitions): class TestMADXWriting: def test_madx_script_writing_from_definition_df(self, knob_definition_df, correct_madx_script): - script = get_madx_script_from_definition_dataframe(knob_definition_df, lsa_knob="LHCBEAM/ATS_Test_Knob") + script = get_madx_script_from_definition_dataframe( + knob_definition_df, lsa_knob="LHCBEAM/ATS_Test_Knob" + ) assert script == correct_madx_script def test_invalid_madx_characters(self): assert _get_trim_variable("hel&lo!-you2") == "trim_hello_you2" - @pytest.mark.parametrize("lsa_knob", ["LHCBEAM/Super_Duper_Long_Name_For_A_Knob_Will_Be_Truncated_For_Sure", "ATS_Test_Knob"]) + @pytest.mark.parametrize( + "lsa_knob", + ["LHCBEAM/Super_Duper_Long_Name_For_A_Knob_Will_Be_Truncated_For_Sure", "ATS_Test_Knob"], + ) def test_trim_variable_from_long_knob_name(self, lsa_knob): """Testing that the generated trim variable is correctly truncated if too long.""" assert ( _get_trim_variable("ATS_2022_05_08_B1_arc_by_arc_coupling_133cm_30cm") == "trim_22_05_08_B1_arc_by_arc_coupling_133cm_30cm" ) - assert _get_trim_variable("___knob") == "trim_knob" # make sure we handle several underscores + assert ( + _get_trim_variable("___knob") == "trim_knob" + ) # make sure we handle several underscores assert len(_get_trim_variable(lsa_knob)) < 48 def test_get_deltas_fails(self, knob_definition_df): - """ Tests that get_delta fails when both columns are present and populated""" + """Tests that get_delta fails when both columns are present and populated""" knob_definition_df["DELTA_KL"] = knob_definition_df.DELTA_K with pytest.raises(ValueError) as e: _get_delta(knob_definition_df) assert "DELTA_KL and DELTA_K" in str(e) def test_get_deltas_succeeds(self, knob_definition_df): - """ Tests that get_delta succeeds when both columns are present but only one has a number, - or when only one column is present""" + """Tests that get_delta succeeds when both columns are present but only one has a number, + or when only one column is present""" deltas = _get_delta(knob_definition_df) assert (deltas == knob_definition_df.DELTA_K).all() @@ -91,13 +94,14 @@ def knob_definition_df() -> tfs.TfsDataFrame: @pytest.fixture() -def parsed_definitions() -> Dict[str, float]: +def parsed_definitions() -> dict[str, float]: with (LSA_TO_MADX_INPUTS / "parsed_definitions.json").open("r") as f: - defs = json.load(f) - return defs + return json.load(f) @pytest.fixture def correct_madx_script() -> str: """Script for LHCBEAM/MD_ATS_2022_05_04_B1_RigidWaitsShift_IP1pos_knob with trim at +1""" - return (LSA_TO_MADX_INPUTS / "LHCBEAM_MD_ATS_2022_05_04_B1_RigidWaitsShift_IP1pos_knob.madx").read_text() + return ( + LSA_TO_MADX_INPUTS / "LHCBEAM_MD_ATS_2022_05_04_B1_RigidWaitsShift_IP1pos_knob.madx" + ).read_text()