diff --git a/doc/conf.py b/doc/conf.py index 028a0d1..5d9af52 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # optics_functions documentation build configuration file, created by # sphinx-quickstart on Tue Feb 6 12:10:18 2018. @@ -17,6 +16,7 @@ import pathlib import sys import warnings + warnings.filterwarnings("ignore", message="numpy.dtype size changed") warnings.filterwarnings("ignore", message="numpy.ufunc size changed") @@ -183,7 +183,7 @@ def about_package(init_posixpath: pathlib.Path) -> dict: # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, "optics_functions.tex", u"Optics Functions Documentation", u"OMC-TEAM", "manual"), + (master_doc, "optics_functions.tex", "Optics Functions Documentation", "OMC-TEAM", "manual"), ] @@ -191,7 +191,7 @@ def about_package(init_posixpath: pathlib.Path) -> dict: # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). -man_pages = [(master_doc, "optics_functions", u"Optics Functions Documentation", [author], 1)] +man_pages = [(master_doc, "optics_functions", "Optics Functions Documentation", [author], 1)] # -- Options for Texinfo output ------------------------------------------- @@ -203,7 +203,7 @@ def about_package(init_posixpath: pathlib.Path) -> dict: ( master_doc, "optics_functions", - u"Optics Functions Documentation", + "Optics Functions Documentation", author, "OMC-TEAM", "One line description of project.", diff --git a/optics_functions/constants.py b/optics_functions/constants.py index 6061de5..39b29b0 100644 --- a/optics_functions/constants.py +++ b/optics_functions/constants.py @@ -4,14 +4,15 @@ Constants for the optics functions. """ + import numpy as np PI = np.pi PI2 = 2 * np.pi PI2I = 2j * np.pi PLANES = ("X", "Y") -PLANE_TO_NUM = dict(X=1, Y=2) -PLANE_TO_HV = dict(X="H", Y="V") +PLANE_TO_NUM = {"X": 1, "Y": 2} +PLANE_TO_HV = {"X": "H", "Y": "V"} # Columns & Headers ------------------------------------------------------------ NAME = "NAME" diff --git a/optics_functions/coupling.py b/optics_functions/coupling.py index e5d2201..4c8a8f7 100644 --- a/optics_functions/coupling.py +++ b/optics_functions/coupling.py @@ -5,20 +5,41 @@ Functions to estimate coupling from twiss dataframes and different methods to calculate the closest tune approach from the calculated coupling RDTs. """ + +from __future__ import annotations + import logging from contextlib import suppress -from typing import Sequence, Tuple +from typing import TYPE_CHECKING import numpy as np from pandas import DataFrame, Series from tfs import TfsDataFrame -from optics_functions.constants import (ALPHA, BETA, GAMMA, X, Y, TUNE, DELTA, - MINIMUM, PI2, PHASE_ADV, S, LENGTH, - IMAG, REAL, F1010, F1001) +from optics_functions.constants import ( + ALPHA, + BETA, + DELTA, + F1001, + F1010, + GAMMA, + IMAG, + LENGTH, + MINIMUM, + PHASE_ADV, + PI2, + REAL, + TUNE, + S, + X, + Y, +) from optics_functions.rdt import calculate_rdts from optics_functions.utils import split_complex_columns, timeit +if TYPE_CHECKING: + from collections.abc import Sequence + COUPLING_RDTS = [F1001, F1010] LOG = logging.getLogger(__name__) @@ -59,8 +80,11 @@ def coupling_via_rdts(df: TfsDataFrame, complex_columns: bool = True, **kwargs) return df_res -def coupling_via_cmatrix(df: DataFrame, complex_columns: bool = True, - output: Sequence[str] = ("rdts", "gamma", "cmatrix")) -> DataFrame: +def coupling_via_cmatrix( + df: DataFrame, + complex_columns: bool = True, + output: Sequence[str] = ("rdts", "gamma", "cmatrix"), +) -> DataFrame: """Calculates C matrix then Coupling and Gamma from it. See [CalagaBetatronCoupling2005]_ . @@ -173,10 +197,14 @@ def rmatrix_from_coupling(df: DataFrame, complex_columns: bool = True) -> DataFr # Eq. (15) if complex_columns: - abs_squared_diff = df[F1001].abs()**2 - df[F1010].abs()**2 + abs_squared_diff = df[F1001].abs() ** 2 - df[F1010].abs() ** 2 else: - abs_squared_diff = (df[f"{F1001}{REAL}"]**2 + df[f"{F1001}{IMAG}"]**2 - - df[f"{F1010}{REAL}"]**2 - df[f"{F1010}{IMAG}"]**2) + abs_squared_diff = ( + df[f"{F1001}{REAL}"] ** 2 + + df[f"{F1001}{IMAG}"] ** 2 + - df[f"{F1010}{REAL}"] ** 2 + - df[f"{F1010}{IMAG}"] ** 2 + ) gamma = np.sqrt(1.0 / (1.0 + 4.0 * abs_squared_diff)) @@ -209,6 +237,7 @@ def rmatrix_from_coupling(df: DataFrame, complex_columns: bool = True) -> DataFr # Closest Tune Approach -------------------------------------------------------- + def closest_tune_approach( df: TfsDataFrame, qx: float = None, qy: float = None, method: str = "teapot" ) -> TfsDataFrame: @@ -249,7 +278,9 @@ def closest_tune_approach( of the mean of this column. """ if F1001 not in df.columns: - raise KeyError(f"'{F1001}' column not in dataframe. Needed to calculated closest tune approach.") + raise KeyError( + f"'{F1001}' column not in dataframe. Needed to calculated closest tune approach." + ) method_map = { "teapot": _cta_teapot, # as named in [HoydalsvikEvaluationOfTheClosestTuneApproach2021]_ @@ -279,7 +310,7 @@ def closest_tune_approach( def _cta_franchi(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series: - """ Closest tune approach calculated by Eq. (1) in [PerssonImprovedControlCoupling2014]_ . """ + """Closest tune approach calculated by Eq. (1) in [PerssonImprovedControlCoupling2014]_ .""" return 4 * (qx_frac - qy_frac) * df[F1001].abs() @@ -294,21 +325,21 @@ def _cta_persson_alt(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series def _cta_persson(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series: - """ Closest tune approach calculated by Eq. (2) in [PerssonImprovedControlCoupling2014]_ . """ + """Closest tune approach calculated by Eq. (2) in [PerssonImprovedControlCoupling2014]_ .""" deltaq = qx_frac - qy_frac # fractional tune split location_term = np.exp(1j * PI2 * (deltaq * df[S] / (df.headers[LENGTH] / PI2))) return _cta_persson_alt(df, qx_frac, qy_frac) * location_term def _cta_hoydalsvik(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series: - """ Closest tune approach calculated by Eq. (14) in + """Closest tune approach calculated by Eq. (14) in [HoydalsvikEvaluationOfTheClosestTuneApproach2021]_ . This is like the persson estimate but divided by 1 + 4|F1001|^2 .""" return _cta_persson(df, qx_frac, qy_frac) / (1 + 4 * df[F1001].abs() ** 2) def _cta_hoydalsvik_alt(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series: - """ Closest tune approach calculated by Eq. (14) without the s-term in + """Closest tune approach calculated by Eq. (14) without the s-term in [HoydalsvikEvaluationOfTheClosestTuneApproach2021]_ . This is like the persson_alt estimate but divided by 1 + 4|F1001|^2 .""" return _cta_persson_alt(df, qx_frac, qy_frac) / (1 + 4 * df[F1001].abs() ** 2) @@ -332,20 +363,19 @@ def _cta_teapot(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series: def _cta_teapot_franchi(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series: """Closest tune approach calculated by Eq. (12) in [HoydalsvikEvaluationOfTheClosestTuneApproach2021]_ . - This is the teapot approach with the Franchi approximation. """ + This is the teapot approach with the Franchi approximation.""" return 4 * (qx_frac - qy_frac) * df[F1001].abs() / (1 + 4 * df[F1001].abs() ** 2) -def _get_weights_from_lengths(df: TfsDataFrame) -> Tuple[float, np.array]: - """Coefficients for the `persson` method. """ +def _get_weights_from_lengths(df: TfsDataFrame) -> tuple[float, np.ndarray]: + """Coefficients for the `persson` method.""" # approximate length of each element (ds in integral) s_periodic = np.zeros(len(df) + 1) s_periodic[1:] = df[S].to_numpy() s_periodic[0] = df[S].to_numpy()[-1] - df.headers[LENGTH] # weight ds/(2*pi*R) * N (as we take the mean afterwards) - weights = np.diff(s_periodic) / df.headers[LENGTH] * len(df) - return weights + return np.diff(s_periodic) / df.headers[LENGTH] * len(df) def check_resonance_relation(df: DataFrame, to_nan: bool = False) -> DataFrame: @@ -368,11 +398,15 @@ def check_resonance_relation(df: DataFrame, to_nan: bool = False) -> DataFrame: LOG.debug("Sum-resonance not in df, skipping resonance relation check.") return df - condition_not_fulfilled = df[F1001].abs() < df[F1010].abs() # comparison with NaN always yields False + condition_not_fulfilled = ( + df[F1001].abs() < df[F1010].abs() + ) # comparison with NaN always yields False if any(condition_not_fulfilled): - LOG.warning(f"In {sum(condition_not_fulfilled) / len(df.index) * 100}% " - "of the data points |F1001| < |F1010|. Your closest tune " - "approach estimates might not be accurate.") + LOG.warning( + f"In {sum(condition_not_fulfilled) / len(df.index) * 100}% " + "of the data points |F1001| < |F1010|. Your closest tune " + "approach estimates might not be accurate." + ) if to_nan: df.loc[condition_not_fulfilled, COUPLING_RDTS] = np.nan return df diff --git a/optics_functions/rdt.py b/optics_functions/rdt.py index a3c153e..279ec3b 100644 --- a/optics_functions/rdt.py +++ b/optics_functions/rdt.py @@ -4,29 +4,46 @@ Functions for the calculations of Resonance Driving Terms, as well as getting lists of valid driving term indices for certain orders. - """ + +from __future__ import annotations + import itertools import logging from math import factorial -from typing import Tuple, Sequence, List, Union +from typing import TYPE_CHECKING import numpy as np import pandas as pd from tfs import TfsDataFrame -from optics_functions.constants import PI2I, X, Y, BETA, TUNE -from optics_functions.utils import (seq2str, timeit, get_all_phase_advances, - dphi_at_element, dphi, i_pow, - split_complex_columns) +from optics_functions.constants import BETA, PI2I, TUNE, X, Y +from optics_functions.utils import ( + dphi, + dphi_at_element, + get_all_phase_advances, + i_pow, + seq2str, + split_complex_columns, + timeit, +) + +if TYPE_CHECKING: + from collections.abc import Sequence LOG = logging.getLogger(__name__) -def calculate_rdts(df: TfsDataFrame, rdts: Sequence[str], - qx: float = None, qy: float = None, feeddown: int = 0, - complex_columns: bool = True, loop_phases: bool = False, - hamiltionian_terms: bool = False) -> TfsDataFrame: +def calculate_rdts( + df: TfsDataFrame, + rdts: Sequence[str], + qx: float = None, + qy: float = None, + feeddown: int = 0, + complex_columns: bool = True, + loop_phases: bool = False, + hamiltionian_terms: bool = False, +) -> TfsDataFrame: r"""Calculates the Resonance Driving Terms. Eq. (A8) in [FranchiAnalyticFormulas2017]_ . @@ -64,10 +81,10 @@ def calculate_rdts(df: TfsDataFrame, rdts: Sequence[str], for rdt in rdts: rdt = rdt.upper() - if len(rdt) != 5 or rdt[0] != 'F': + if len(rdt) != 5 or rdt[0] != "F": raise ValueError(f"'{rdt:s}' does not seem to be a valid RDT name.") - j, k, l, m = [int(i) for i in rdt[1:]] + j, k, l, m = [int(i) for i in rdt[1:]] # noqa: E741 conj_rdt = jklm2str(k, j, m, l) if conj_rdt in df_res: @@ -80,17 +97,21 @@ def calculate_rdts(df: TfsDataFrame, rdts: Sequence[str], if n <= 1: raise ValueError(f"The RDT-order has to be >1 but was {n:d} for {rdt:s}") - denom_h = 1./(factorial(j) * factorial(k) * factorial(l) * factorial(m) * (2**n)) - denom_f = 1./(1. - np.exp(PI2I * ((j-k) * qx + (l-m) * qy))) + denom_h = 1.0 / ( + factorial(j) * factorial(k) * factorial(l) * factorial(m) * (2**n) + ) + denom_f = 1.0 / (1.0 - np.exp(PI2I * ((j - k) * qx + (l - m) * qy))) - betax = df[f"{BETA}{X}"]**(jk/2.) - betay = df[f"{BETA}{Y}"]**(lm/2.) + betax = df[f"{BETA}{X}"] ** (jk / 2.0) + betay = df[f"{BETA}{Y}"] ** (lm / 2.0) # Magnetic Field Strengths with Feed-Down - dx_idy = df[X] + 1j*df[Y] - k_complex = pd.Series(0j, index=df.index) # Complex sum of strenghts (from K_n + iJ_n) and feeddown to them + dx_idy = df[X] + 1j * df[Y] + k_complex = pd.Series( + 0j, index=df.index + ) # Complex sum of strenghts (from K_n + iJ_n) and feeddown to them - for q in range(feeddown+1): + for q in range(feeddown + 1): n_mad = n + q - 1 kl_iksl = df[f"K{n_mad:d}L"] + 1j * df[f"K{n_mad:d}SL"] k_complex += (kl_iksl * (dx_idy**q)) / factorial(q) @@ -98,7 +119,8 @@ def calculate_rdts(df: TfsDataFrame, rdts: Sequence[str], # real(i**lm * k+ij) is equivalent to Omega-function in paper, see Eq.(A11) # pd.Series is needed here, as np.real() returns numpy-array k_real = pd.Series(np.real(i_pow(lm) * k_complex), index=df.index) - sources = df.index[k_real != 0] # other elements do not contribute to integral, speedup summations + # Only select elements contributing to the integral, to speedup summations + sources = df.index[k_real != 0] if not len(sources): LOG.warning(f"No sources found for {rdt}. RDT will be zero.") @@ -118,13 +140,22 @@ def calculate_rdts(df: TfsDataFrame, rdts: Sequence[str], # index-intersection keeps `element` at correct place in index sources_plus = df.index.intersection(sources.union([element])) dphis = dphi_at_element(df.loc[sources_plus, :], element, qx, qy) - phase_term = np.exp(PI2I * ((j-k) * dphis[X].loc[sources] + (l-m) * dphis[Y].loc[sources])) + phase_term = np.exp( + PI2I + * ( + (j - k) * dphis[X].loc[sources] + + (l - m) * dphis[Y].loc[sources] + ) + ) h_jklm[element] = (h_terms * phase_term).sum() * denom_h else: - phx = dphi(phase_advances['X'].loc[sources, :], qx) - phy = dphi(phase_advances['Y'].loc[sources, :], qy) - phase_term = np.exp(PI2I * ((j-k) * phx + (l-m) * phy)) - h_jklm = phase_term.multiply(h_terms, axis="index").sum(axis="index").transpose() * denom_h + phx = dphi(phase_advances["X"].loc[sources, :], qx) + phy = dphi(phase_advances["Y"].loc[sources, :], qy) + phase_term = np.exp(PI2I * ((j - k) * phx + (l - m) * phy)) + h_jklm = ( + phase_term.multiply(h_terms, axis="index").sum(axis="index").transpose() + * denom_h + ) df_res[rdt] = h_jklm * denom_f LOG.info(f"Average RDT amplitude |{rdt:s}|: {df_res[rdt].abs().mean():g}") @@ -140,9 +171,14 @@ def calculate_rdts(df: TfsDataFrame, rdts: Sequence[str], return df_res -def get_ac_dipole_rdts(order_or_terms: Union[int, str, Sequence[str]], spectral_line: Tuple[int], - plane: str, ac_tunes: Tuple[float, float], acd_name: str): - """ Calculates the Hamiltonian Terms under Forced Motion. +def get_ac_dipole_rdts( + order_or_terms: int | str | Sequence[str], + spectral_line: tuple[int], + plane: str, + ac_tunes: tuple[float, float], + acd_name: str, +): + """Calculates the Hamiltonian Terms under Forced Motion. Args: order_or_terms (Union[int, str, Sequence[str]]): If an int is given all Resonance Driving @@ -156,24 +192,31 @@ def get_ac_dipole_rdts(order_or_terms: Union[int, str, Sequence[str]], spectral_ i.e. (0.302, 0.33) acd_name (str): The AC Dipole element name (?). """ - raise NotImplementedError("Todo. Leave it here so it's not forgotten. See (and improve) python2 code!") + raise NotImplementedError( + "Todo. Leave it here so it's not forgotten. See (and improve) python2 code!" + ) # RDT Definition Generation Functions ------------------------------------------ -def get_all_to_order(n: int) -> List[Tuple[int, int, int, int]]: - """ Returns list of all valid RDT jklm-tuple of order 2 to n """ + +def get_all_to_order(n: int) -> list[tuple[int, int, int, int]]: + """Returns list of all valid RDT jklm-tuple of order 2 to n""" if n <= 1: raise ValueError("'n' must be greater 1 for resonance driving terms.") - permut = [x for x in itertools.product(range(n + 1), repeat=4) - if 1 < sum(x) <= n and not (x[0] == x[1] and x[2] == x[3])] - return list(sorted(permut, key=sum)) + permut = [ + x + for x in itertools.product(range(n + 1), repeat=4) + if 1 < sum(x) <= n and not (x[0] == x[1] and x[2] == x[3]) + ] + return sorted(permut, key=sum) -def generator(orders: Sequence[int], normal: bool = True, - skew: bool = True, complex_conj: bool = True) -> dict: - """ Generates lists of RDT-4-tuples sorted into a dictionary by order. +def generator( + orders: Sequence[int], normal: bool = True, skew: bool = True, complex_conj: bool = True +) -> dict: + """Generates lists of RDT-4-tuples sorted into a dictionary by order. Args: orders (list): list of orders to be generated. Orders < 2 raise errors. @@ -185,7 +228,7 @@ def generator(orders: Sequence[int], normal: bool = True, Returns: Dictionary with keys of orders containing lists of 4-Tuples for the RDTs of that order. """ - if any([n <= 1 for n in orders]): + if any(n <= 1 for n in orders): raise ValueError("All order must be greater 1 for resonance driving terms.") if not (normal or skew): @@ -194,7 +237,8 @@ def generator(orders: Sequence[int], normal: bool = True, permut = {o: [] for o in orders} for x in itertools.product(range(max(orders) + 1), repeat=4): order = sum(x) - if ((order in orders) # check for order + if ( + (order in orders) # check for order and not (x[0] == x[1] and x[2] == x[3]) # rdt index rule and ((skew and sum(x[2:4]) % 2) or (normal and not sum(x[2:4]) % 2)) # skew or normal and (complex_conj or (x[1], x[0], x[3], x[2]) not in permut[order]) # filter conj @@ -205,11 +249,12 @@ def generator(orders: Sequence[int], normal: bool = True, # Other ------------------------------------------------------------------------ -def jklm2str(j: int, k: int, l: int, m: int) -> str: + +def jklm2str(j: int, k: int, l: int, m: int) -> str: # noqa: E741 return f"F{j:d}{k:d}{l:d}{m:d}" -def str2jklm(rdt: str) -> Tuple[int, ...]: +def str2jklm(rdt: str) -> tuple[int, ...]: return tuple(int(i) for i in rdt[1:]) diff --git a/optics_functions/utils.py b/optics_functions/utils.py index fcb2aae..f98df16 100644 --- a/optics_functions/utils.py +++ b/optics_functions/utils.py @@ -7,11 +7,12 @@ that are needed within multiple optics calculations. """ + import logging import string +from collections.abc import Iterable, Sequence from contextlib import contextmanager from time import time -from typing import Iterable, Sequence, Tuple import numpy as np import pandas as pd @@ -88,7 +89,9 @@ def prepare_twiss_dataframe( for name, df_old in (("twiss", df_twiss), ("errors", df_errors)): dropped_columns = set(df_old.columns) - set(df.columns) if dropped_columns: - LOG.warning(f"The following {name}-columns were dropped on merge: {seq2str(dropped_columns)}") + LOG.warning( + f"The following {name}-columns were dropped on merge: {seq2str(dropped_columns)}" + ) return df @@ -140,7 +143,7 @@ def merge_complex_columns( def switch_signs_for_beam4( df_twiss: pd.DataFrame, df_errors: pd.DataFrame = None -) -> Tuple[TfsDataFrame, TfsDataFrame]: +) -> tuple[TfsDataFrame, TfsDataFrame]: """Switch the signs for Beam 4 optics. This is due to the switch in direction for this beam and (anti-) symmetry after a rotation of 180deg around the y-axis of magnets, @@ -245,7 +248,7 @@ def dphi_at_element(df: pd.DataFrame, element: str, qx: float, qy: float) -> dic def add_missing_columns(df: pd.DataFrame, columns: Iterable) -> pd.DataFrame: - """ Check if `columns` are in `df` and add them all zero if not.""" + """Check if `columns` are in `df` and add them all zero if not.""" for col in columns: if col not in df.columns: LOG.debug(f"Added {col:s} with all zero to data-frame.") @@ -278,7 +281,7 @@ def timeit(text: str = "Time used {:.3f}s", print_fun=LOG.debug): def get_format_keys(format_str: str) -> list: - """ Get keys from format string. Unnamed placeholders are returned as empty strings.""" + """Get keys from format string. Unnamed placeholders are returned as empty strings.""" return [t[1] for t in string.Formatter().parse(format_str) if t[1] is not None] @@ -286,17 +289,17 @@ def get_format_keys(format_str: str) -> list: def seq2str(sequence: Iterable) -> str: - """ Converts an Iterable to string of its comma separated elements. """ + """Converts an Iterable to string of its comma separated elements.""" return ", ".join(str(item) for item in sequence) def i_pow(n: int) -> complex: - """ Calculates i**n in a quick and exact way. """ + """Calculates i**n in a quick and exact way.""" return 1j ** (n % 4) def set_name_index(df: pd.DataFrame, df_name="") -> pd.DataFrame: - """ Sets the NAME column as index if present and checks for string index. """ + """Sets the NAME column as index if present and checks for string index.""" if NAME in df.columns: df = df.set_index(NAME) diff --git a/pyproject.toml b/pyproject.toml index 5031ff6..c75dd81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,3 +80,57 @@ markers = [ # Helpful for pytest-debugging (leave commented out on commit): # log_cli=true # log_level=DEBUG + + +# ----- Dev Tools Configuration ----- # + +[tool.ruff] +exclude = [ + ".eggs", + ".git", + ".mypy_cache", + ".venv", + "_build", + "build", + "dist", +] + +# Assume Python 3.10+ +target-version = "py310" + +line-length = 100 +indent-width = 4 + +[tool.ruff.lint] +# Allow unused variables when underscore-prefixed. +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" +ignore = [ + "E501", # line too long + "FBT001", # boolean-type-hint-positional-argument + "FBT002", # boolean-default-value-positional-argument + "PT019", # pytest-fixture-param-without-value (but suggested solution fails) +] +extend-select = [ + "F", # Pyflakes rules + "W", # PyCodeStyle warnings + "E", # PyCodeStyle errors + "I", # Sort imports properly + "A", # Detect shadowed builtins + "N", # enforce naming conventions, e.g. ClassName vs function_name + "UP", # Warn if certain things can changed due to newer Python versions + "C4", # Catch incorrect use of comprehensions, dict, list, etc + "FA", # Enforce from __future__ import annotations + "FBT", # detect boolean traps + "ISC", # Good use of string concatenation + "BLE", # disallow catch-all exceptions + "ICN", # Use common import conventions + "RET", # Good return practices + "SIM", # Common simplification rules + "TID", # Some good import practices + "TC", # Enforce importing certain types in a TYPE_CHECKING block + "PTH", # Use pathlib instead of os.path + "NPY", # Some numpy-specific things +] +# Allow fix for all enabled rules (when `--fix`) is provided. +fixable = ["ALL"] +unfixable = [] diff --git a/tests/unit/debug_helper.py b/tests/unit/debug_helper.py index 83bd0e9..6acba20 100644 --- a/tests/unit/debug_helper.py +++ b/tests/unit/debug_helper.py @@ -1,11 +1,14 @@ """ Not at all needed for package, but helps for debugging. """ -from matplotlib import pyplot as plt # not in requirements + +import logging +import sys + import numpy as np -from optics_functions.constants import S, REAL, IMAG +from matplotlib import pyplot as plt # not in requirements -import logging, sys +from optics_functions.constants import IMAG, REAL, S def plot_rdts_vs_ptc(df_rdt, df_ptc_rdt, df_twiss, rdt_names): @@ -24,14 +27,19 @@ def plot_rdts_vs_ptc(df_rdt, df_ptc_rdt, df_twiss, rdt_names): axs[1].plot(df_twiss[S], np.imag(df_rdt[f"{rdt}"]), color="C1", label="Analytical") axs[1].set_ylabel(f"imag {rdt}") - axs[2].plot(df_twiss[S], np.sqrt(df_ptc_rdt[f"{rdt}{IMAG}"]**2 + df_ptc_rdt[f"{rdt}{REAL}"]**2), color="C0", label="PTC") + axs[2].plot( + df_twiss[S], + np.sqrt(df_ptc_rdt[f"{rdt}{IMAG}"] ** 2 + df_ptc_rdt[f"{rdt}{REAL}"] ** 2), + color="C0", + label="PTC", + ) axs[2].plot(df_twiss[S], np.abs(df_rdt[f"{rdt}"]), color="C1", label="Analytical") axs[2].set_ylabel(f"abs {rdt}") - axs[2].set_xlabel(f"Location [m]") + axs[2].set_xlabel("Location [m]") plt.show() -def plot_rdts_vs(df_rdt1, label1, df_rdt2, label2, df_twiss, rdt_names): +def plot_rdts_vs(df_rdt1, label1, df_rdt2, label2, df_twiss, rdt_names): for rdt in rdt_names: fig, axs = plt.subplots(3, 1) axs[0].plot(df_twiss[S], np.real(df_rdt1[f"{rdt}"]), color="C0", label=label1) @@ -46,13 +54,11 @@ def plot_rdts_vs(df_rdt1, label1, df_rdt2, label2, df_twiss, rdt_names): axs[2].plot(df_twiss[S], np.abs(df_rdt1[f"{rdt}"]), color="C0", label=label1) axs[2].plot(df_twiss[S], np.abs(df_rdt2[f"{rdt}"]), color="C1", label=label2) axs[2].set_ylabel(f"abs {rdt}") - axs[2].set_xlabel(f"Location [m]") + axs[2].set_xlabel("Location [m]") plt.show() def enable_logging(level=logging.DEBUG): logging.basicConfig( - stream=sys.stdout, - level=level, - format="%(levelname)7s | %(message)s | %(name)s" + stream=sys.stdout, level=level, format="%(levelname)7s | %(message)s | %(name)s" ) diff --git a/tests/unit/test_coupling.py b/tests/unit/test_coupling.py index daeb6cf..da6ca5b 100644 --- a/tests/unit/test_coupling.py +++ b/tests/unit/test_coupling.py @@ -1,4 +1,3 @@ -from collections import namedtuple from pathlib import Path import numpy as np @@ -9,7 +8,19 @@ from test_rdt import arrays_are_close_almost_everywhere from optics_functions.constants import ( - ALPHA, BETA, F1001, F1010, GAMMA, IMAG, NAME, PHASE_ADV, REAL, TUNE, S, X, Y + ALPHA, + BETA, + F1001, + F1010, + GAMMA, + IMAG, + NAME, + PHASE_ADV, + REAL, + TUNE, + S, + X, + Y, ) from optics_functions.coupling import ( COUPLING_RDTS, @@ -29,7 +40,6 @@ @pytest.mark.basic def test_cmatrix(): n = 5 - np.random.seed(487423872) df = generate_fake_data(n) df_res = coupling_via_cmatrix(df) @@ -37,7 +47,7 @@ def test_cmatrix(): assert not df_res.isna().any().any() # Checks based on CalagaBetatronCoupling2005 - detC = df_res["C11"] * df_res["C22"] - df_res["C12"] * df_res["C21"] + detC = df_res["C11"] * df_res["C22"] - df_res["C12"] * df_res["C21"] # noqa: N806 fsq_diff = df_res[F1001].abs() ** 2 - df_res[F1010].abs() ** 2 f_term = 1 / (1 + 4 * fsq_diff) g_sq = df_res[GAMMA] ** 2 @@ -52,10 +62,9 @@ def test_cmatrix(): @pytest.mark.parametrize("source", ["real", "fake"]) def test_rmatrix_to_coupling_to_rmatrix(source): if source == "fake": - np.random.seed(487423872) df = generate_fake_data(5) else: - df = tfs.read(INPUT / "coupling_bump" / f"twiss.lhc.b1.coupling_bump.tfs", index=NAME) + df = tfs.read(INPUT / "coupling_bump" / "twiss.lhc.b1.coupling_bump.tfs", index=NAME) df_coupling = coupling_via_cmatrix(df) for col in (f"{ALPHA}X", f"{BETA}X", f"{ALPHA}Y", f"{BETA}Y"): @@ -71,11 +80,11 @@ def test_rmatrix_to_coupling_to_rmatrix(source): @pytest.mark.basic def test_real_output(): n = 7 - np.random.seed(474987942) + rng = np.random.default_rng(474987942) df = generate_fake_data(n) df = prepare_twiss_dataframe(df_twiss=df) - df.loc[:, "K1L"] = np.random.rand(n) - df.loc[:, "K1SL"] = np.random.rand(n) + df.loc[:, "K1L"] = rng.random(n) + df.loc[:, "K1SL"] = rng.random(n) df_cmatrix = coupling_via_cmatrix(df, complex_columns=False) df_rdts = coupling_via_rdts(df, qx=1.31, qy=1.32, complex_columns=False) @@ -179,12 +188,24 @@ def generate_fake_data(n) -> tfs.TfsDataFrame: df = tfs.TfsDataFrame( 0.0, index=[str(i) for i in range(n)], - columns=[S, f"{ALPHA}{X}", f"{ALPHA}{Y}", f"{BETA}{X}", f"{BETA}{Y}", - f"{PHASE_ADV}{X}", f"{PHASE_ADV}{Y}", "R11", "R12", "R21", "R22"], + columns=[ + S, + f"{ALPHA}{X}", + f"{ALPHA}{Y}", + f"{BETA}{X}", + f"{BETA}{Y}", + f"{PHASE_ADV}{X}", + f"{PHASE_ADV}{Y}", + "R11", + "R12", + "R21", + "R22", + ], headers={f"{TUNE}1": qx, f"{TUNE}2": qy}, ) - r = np.random.rand(n) + rng = np.random.default_rng() + r = rng.random(n) df[S] = np.linspace(0, n, n) df.loc[:, "R11"] = np.sin(r) df.loc[:, "R22"] = r diff --git a/tests/unit/test_rdt.py b/tests/unit/test_rdt.py index e896eaa..e25100e 100644 --- a/tests/unit/test_rdt.py +++ b/tests/unit/test_rdt.py @@ -5,8 +5,8 @@ import pytest import tfs -from optics_functions.constants import PHASE_ADV, X, Y, BETA, S, TUNE, NAME, REAL, IMAG -from optics_functions.rdt import calculate_rdts, generator, get_all_to_order, str2jklm, jklm2str +from optics_functions.constants import BETA, IMAG, NAME, PHASE_ADV, REAL, TUNE, S, X, Y +from optics_functions.rdt import calculate_rdts, generator, get_all_to_order, jklm2str, str2jklm from optics_functions.utils import prepare_twiss_dataframe INPUT = Path(__file__).parent.parent / "inputs" @@ -33,15 +33,15 @@ def test_get_all_to_order(): @pytest.mark.basic def test_generator(): n = 5 - rdt_all = generator(list(range(2, n+1))) - rdt_skew = generator(list(range(2, n+1)), normal=False) - rdt_normal = generator(list(range(2, n+1)), skew=False) - rdt_cc = generator(list(range(2, n+1)), complex_conj=False) - - for o in rdt_all.keys(): - assert len(rdt_all[o]) == 2*len(rdt_skew[o]) - assert len(rdt_all[o]) == 2*len(rdt_normal[o]) - assert len(rdt_all[o]) == 2*len(rdt_cc[o]) + rdt_all = generator(list(range(2, n + 1))) + rdt_skew = generator(list(range(2, n + 1)), normal=False) + rdt_normal = generator(list(range(2, n + 1)), skew=False) + rdt_cc = generator(list(range(2, n + 1)), complex_conj=False) + + for o in rdt_all: + assert len(rdt_all[o]) == 2 * len(rdt_skew[o]) + assert len(rdt_all[o]) == 2 * len(rdt_normal[o]) + assert len(rdt_all[o]) == 2 * len(rdt_cc[o]) assert set(rdt_all[o]) == set(rdt_skew[o] + rdt_normal[o]) assert all(is_odd(sum(rdt[2:4])) for rdt in rdt_skew[o]) assert all(is_even(sum(rdt[2:4])) for rdt in rdt_normal[o]) @@ -52,7 +52,7 @@ def test_generator(): @pytest.mark.basic def test_generator_vs_all_to_order(): n = 5 - rdt_dict = generator(list(range(2, n+1))) + rdt_dict = generator(list(range(2, n + 1))) rdt_list = get_all_to_order(n) rdt_list2 = [] for o, v in rdt_dict.items(): @@ -121,32 +121,32 @@ def test_missing_columns(): @pytest.mark.basic def test_real_terms_and_hamiltonians(): - np.random.seed(2047294792) - n=12 + rng = np.random.default_rng(2047294792) + n = 12 df = get_df(n=n) df = prepare_twiss_dataframe(df_twiss=df) - df.loc[:, "K2L"] = np.random.rand(n) - df.loc[:, "K2SL"] = np.random.rand(n) + df.loc[:, "K2L"] = rng.random(n) + df.loc[:, "K2SL"] = rng.random(n) rdts = ["F1002", "F2001"] df_rdts = calculate_rdts(df, rdts=rdts, complex_columns=False, hamiltionian_terms=True) n_rdts = len(rdts) assert all(np.real(df_rdts) == df_rdts) assert len(df_rdts.columns) == 4 * n_rdts - assert df_rdts.columns.str.match(f".+{REAL}$").sum() == 2*n_rdts - assert df_rdts.columns.str.match(f".+{IMAG}$").sum() == 2*n_rdts - assert df_rdts.columns.str.match("F").sum() == 2*n_rdts - assert df_rdts.columns.str.match("H").sum() == 2*n_rdts + assert df_rdts.columns.str.match(f".+{REAL}$").sum() == 2 * n_rdts + assert df_rdts.columns.str.match(f".+{IMAG}$").sum() == 2 * n_rdts + assert df_rdts.columns.str.match("F").sum() == 2 * n_rdts + assert df_rdts.columns.str.match("H").sum() == 2 * n_rdts @pytest.mark.basic def test_rdts_save_memory(): - np.random.seed(2047294792) + rng = np.random.default_rng(2047294792) n = 10 df = get_df(n=n) df = prepare_twiss_dataframe(df_twiss=df) - df.loc[:, "K2L"] = np.random.rand(n) - df.loc[:, "K2SL"] = np.random.rand(n) + df.loc[:, "K2L"] = rng.random(n) + df.loc[:, "K2SL"] = rng.random(n) df_save = calculate_rdts(df, rdts=["F1002", "F2001"], loop_phases=True) df_notsave = calculate_rdts(df, rdts=["F1002", "F2001"], loop_phases=False) @@ -258,8 +258,8 @@ def test_rdts_normal_dodecapole_to_octupole_feeddown(): df["X"] = 1 df["Y"] = 0.5 df_rdts = calculate_rdts(df, rdts=["F1003", "F0004"], feeddown=2) - assert all((df_rdts["F0004"] - 0.375*df_rdts_comp["F0004"]).abs() <= 1e-15) - assert all(df_rdts["F1003"] == 0.5*df_rdts_comp["F1003"]) + assert all((df_rdts["F0004"] - 0.375 * df_rdts_comp["F0004"]).abs() <= 1e-15) + assert all(df_rdts["F1003"] == 0.5 * df_rdts_comp["F1003"]) @pytest.mark.extended @@ -272,7 +272,7 @@ def test_coupling_bump_sextupole_rdts(): df_rdt = calculate_rdts(df_twiss, rdt_names) for rdt in rdt_names: - rdt_ptc = df_ptc_rdt[f"{rdt}{REAL}"] + 1j*df_ptc_rdt[f"{rdt}{IMAG}"] + rdt_ptc = df_ptc_rdt[f"{rdt}{REAL}"] + 1j * df_ptc_rdt[f"{rdt}{IMAG}"] assert arrays_are_close_almost_everywhere(df_rdt[rdt], rdt_ptc, rtol=1e-2, percentile=0.9) @@ -280,18 +280,18 @@ def test_coupling_bump_sextupole_rdts(): def get_df(n): - """ Fake DF with nonsense values. """ + """Fake DF with nonsense values.""" qx, qy = 1.31, 1.32 phx, phy = f"{PHASE_ADV}{X}", f"{PHASE_ADV}{Y}" betax, betay = f"{BETA}{X}", f"{BETA}{Y}" df = tfs.TfsDataFrame( index=[str(i) for i in range(n)], columns=[S, X, Y, betax, betay, phx, phy], - headers={f"{TUNE}1": qx, f"{TUNE}2": qy} + headers={f"{TUNE}1": qx, f"{TUNE}2": qy}, ) df[S] = np.linspace(0, n, n) - df[phx] = np.linspace(0, qx, n+1)[:n] - df[phy] = np.linspace(0, qy, n+1)[:n] + df[phx] = np.linspace(0, qx, n + 1)[:n] + df[phy] = np.linspace(0, qy, n + 1)[:n] df[betax] = 1 df[betay] = 1 df[[X, Y]] = 0 @@ -300,9 +300,8 @@ def get_df(n): def get_absdiff_and_jumps(df_rdts): df_temp = pd.concat( - [pd.DataFrame(df_rdts), - pd.DataFrame(df_rdts.iloc[[0], :].to_numpy(), index=["temp"])], - axis="index" + [pd.DataFrame(df_rdts), pd.DataFrame(df_rdts.iloc[[0], :].to_numpy(), index=["temp"])], + axis="index", ) df_diff = df_temp.abs().diff().shift(-1).iloc[:-1, :] df_jump = df_diff.abs() > 1e-15 @@ -320,5 +319,7 @@ def is_even(n: int): def arrays_are_close_almost_everywhere(array1, array2, rtol=1e-2, atol=None, percentile=0.9): if atol is None: atol = rtol * np.mean(np.abs(array2)) - return sum(np.isclose(np.abs(array1), np.abs(array2), rtol=rtol, atol=0) | - np.isclose(np.abs(array1), np.abs(array2), rtol=0, atol=atol)) > percentile * len(array1) + return sum( + np.isclose(np.abs(array1), np.abs(array2), rtol=rtol, atol=0) + | np.isclose(np.abs(array1), np.abs(array2), rtol=0, atol=atol) + ) > percentile * len(array1) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 96efe9a..3da6353 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -6,11 +6,21 @@ import pytest import tfs -from optics_functions.constants import PHASE_ADV, Y, X, REAL, IMAG, NAME, DELTA_ORBIT, PLANES, S -from optics_functions.utils import (add_missing_columns, dphi, - get_all_phase_advances, tau, seq2str, i_pow, - prepare_twiss_dataframe, switch_signs_for_beam4, - get_format_keys, dphi_at_element, split_complex_columns, merge_complex_columns) +from optics_functions.constants import DELTA_ORBIT, IMAG, NAME, PHASE_ADV, PLANES, REAL, S, X, Y +from optics_functions.utils import ( + add_missing_columns, + dphi, + dphi_at_element, + get_all_phase_advances, + get_format_keys, + i_pow, + merge_complex_columns, + prepare_twiss_dataframe, + seq2str, + split_complex_columns, + switch_signs_for_beam4, + tau, +) INPUT = Path(__file__).parent.parent / "inputs" @@ -36,10 +46,10 @@ def test_i_pow(): for i in range(10): assert 1j**i == i_pow(i) - assert 1j**0 == i_pow(1000) - assert 1j**1 == i_pow(1001) - assert 1j**2 == i_pow(1002) - assert 1j**3 == i_pow(1003) + assert i_pow(1000) == 1j**0 + assert i_pow(1001) == 1j**1 + assert i_pow(1002) == 1j**2 + assert i_pow(1003) == 1j**3 assert i_pow(1) != i_pow(2) assert i_pow(0) == -i_pow(2) assert i_pow(1) == -i_pow(3) @@ -81,7 +91,7 @@ def test_get_all_phaseadvances(): @pytest.mark.basic def test_split_complex_columns(): - df = pd.DataFrame([1+2j, 3j + 4], columns=["Col"], index=["A", "B"]) + df = pd.DataFrame([1 + 2j, 3j + 4], columns=["Col"], index=["A", "B"]) df_split = split_complex_columns(df, df.columns, drop=False) assert len(df_split.columns) == 3 @@ -95,7 +105,7 @@ def test_split_complex_columns(): @pytest.mark.basic def test_merge_complex_columns(): - df = pd.DataFrame([1+2j, 3j + 4], columns=["Col"], index=["A", "B"]) + df = pd.DataFrame([1 + 2j, 3j + 4], columns=["Col"], index=["A", "B"]) df_split = split_complex_columns(df, df.columns, drop=True) df_merged = merge_complex_columns(df_split, df.columns, drop=False) @@ -115,7 +125,7 @@ def test_dphi(): phs_adv_dict = get_all_phase_advances(df) for q, adv in zip((qx, qy), phs_adv_dict.values()): dp = dphi(adv, q) - diff = dp-adv + diff = dp - adv assert (dp >= 0).all().all() assert (dp <= q).all().all() assert ((diff == 0) | (diff == q)).all().all() @@ -129,11 +139,11 @@ def test_tau(): phs_adv_dict = get_all_phase_advances(df) for q, adv in zip((qx, qy), phs_adv_dict.values()): dp = tau(adv, q) - diff = dp-adv - assert (dp >= -q/2).all().all() - assert (dp <= q/2).all().all() - assert ((diff == -q/2) | (diff == q/2)).all().all() - assert all(dp.loc[i, i] == q/2 for i in range(n)) + diff = dp - adv + assert (dp >= -q / 2).all().all() + assert (dp <= q / 2).all().all() + assert ((diff == -q / 2) | (diff == q / 2)).all().all() + assert all(dp.loc[i, i] == q / 2 for i in range(n)) @pytest.mark.basic @@ -179,14 +189,15 @@ def test_prepare_twiss_dataframe_inner(): n_index, n_kmax, n_valmax = 5, 6, 10 n_kmax_prepare = 5 df_twiss, _ = get_twiss_and_error_df(n_index, n_kmax, n_valmax) - _, df_errors_1 = get_twiss_and_error_df(n_index+3, n_kmax, n_valmax) - df = prepare_twiss_dataframe(df_twiss=df_twiss, df_errors=df_errors_1.iloc[3:, :], max_order=n_kmax_prepare, - join="inner") + _, df_errors_1 = get_twiss_and_error_df(n_index + 3, n_kmax, n_valmax) + df = prepare_twiss_dataframe( + df_twiss=df_twiss, df_errors=df_errors_1.iloc[3:, :], max_order=n_kmax_prepare, join="inner" + ) k_columns = df.columns[df.columns.str.match(r"^K\d+S?L")] assert len(k_columns) == n_kmax * 2 - assert len(df.index) == n_index-3 + assert len(df.index) == n_index - 3 assert all(df[S] == df_twiss.loc[df.index, S]) assert df.headers == df_twiss.headers @@ -198,9 +209,13 @@ def test_prepare_twiss_dataframe_outer(): n_index, n_kmax, n_valmax = 8, 6, 10 n_kmax_prepare = 16 _, df_errors = get_twiss_and_error_df(n_index, n_kmax, n_valmax) - df_twiss_1, _ = get_twiss_and_error_df(n_index, n_kmax+2, n_valmax) - df = prepare_twiss_dataframe(df_twiss=df_twiss_1.iloc[3:, :], df_errors=df_errors.iloc[:-3, :], - max_order=n_kmax_prepare, join="outer") + df_twiss_1, _ = get_twiss_and_error_df(n_index, n_kmax + 2, n_valmax) + df = prepare_twiss_dataframe( + df_twiss=df_twiss_1.iloc[3:, :], + df_errors=df_errors.iloc[:-3, :], + max_order=n_kmax_prepare, + join="outer", + ) k_columns = df.columns[df.columns.str.match(r"^K\d+S?L")] assert len(k_columns) == n_kmax_prepare * 2 @@ -238,15 +253,17 @@ def test_switch_signs_for_beam4(): df_twiss_b4, df_errors_b4 = get_twiss_and_error_df(n_index, n_kmax, n_valmax) df_twiss_b2, df_errors_b2 = switch_signs_for_beam4(df_twiss_b4, df_errors_b4) - switch_columns = [X,] # this needs to be correct!!! + switch_columns = [X] # this needs to be correct!!! for col in df_twiss_b4.columns: sign = -1 if col in switch_columns else 1 - assert df_twiss_b2[col].equals(sign*df_twiss_b4[col]) + assert df_twiss_b2[col].equals(sign * df_twiss_b4[col]) - switch_columns = [f"{DELTA_ORBIT}{X}"] + [f"K{o:d}{'' if o % 2 else 'S'}L" for o in range(n_kmax)] # this needs to be correct!!! + switch_columns = [f"{DELTA_ORBIT}{X}"] + [ + f"K{o:d}{'' if o % 2 else 'S'}L" for o in range(n_kmax) + ] # this needs to be correct!!! for col in df_errors_b4.columns: sign = -1 if col in switch_columns else 1 - assert df_errors_b2[col].equals(sign*df_errors_b4[col]) + assert df_errors_b2[col].equals(sign * df_errors_b4[col]) @pytest.mark.extended @@ -268,29 +285,37 @@ def test_switch_signs_for_beam4_madx_data(): # as the values are not exact and not all signs perfect: check if more signs are equal than before... # Not the most impressive test. Other ideas welcome. - assert ((np.sign(df_twiss_b4switched[twiss_cols]) == np.sign(df_twiss_b2[twiss_cols])).sum().sum() - > - (np.sign(df_twiss_b4[twiss_cols]) == np.sign(df_twiss_b2[twiss_cols])).sum().sum() + 0.9*len(df_twiss_b2)) + assert ( + np.sign(df_twiss_b4switched[twiss_cols]) == np.sign(df_twiss_b2[twiss_cols]) + ).sum().sum() > ( + np.sign(df_twiss_b4[twiss_cols]) == np.sign(df_twiss_b2[twiss_cols]) + ).sum().sum() + 0.9 * len(df_twiss_b2) + + assert ( + np.sign(df_errors_b4switched[err_cols]) == np.sign(df_errors_b2[err_cols]) + ).sum().sum() > ( + np.sign(df_errors_b4[err_cols]) == np.sign(df_errors_b2[err_cols]) + ).sum().sum() + len(df_twiss_b2) - assert ((np.sign(df_errors_b4switched[err_cols]) == np.sign(df_errors_b2[err_cols])).sum().sum() - > - (np.sign(df_errors_b4[err_cols]) == np.sign(df_errors_b2[err_cols])).sum().sum() + len(df_twiss_b2)) # Helper ----------------------------------------------------------------------- def get_twiss_and_error_df(n_index, n_kmax, n_valmax): + rng = np.random.default_rng() twiss_cols, err_cols = get_twiss_and_error_columns(n_kmax) - data = np.random.rand(n_index, len(twiss_cols)) * n_valmax - data[n_index // 2:, :] = -data[n_index // 2:, :] - - df_twiss = tfs.TfsDataFrame(data[:, :len(twiss_cols)], - index=list(string.ascii_uppercase[:n_index]), - columns=twiss_cols, - headers={"Just": "Some", "really": "nice", "header": 1111}) - df_errors = tfs.TfsDataFrame(data[:, :len(err_cols)], - index=list(string.ascii_uppercase[:n_index]), - columns=err_cols) + data = rng.random(size=(n_index, len(twiss_cols))) * n_valmax + data[n_index // 2 :, :] = -data[n_index // 2 :, :] + + df_twiss = tfs.TfsDataFrame( + data[:, : len(twiss_cols)], + index=list(string.ascii_uppercase[:n_index]), + columns=twiss_cols, + headers={"Just": "Some", "really": "nice", "header": 1111}, + ) + df_errors = tfs.TfsDataFrame( + data[:, : len(err_cols)], index=list(string.ascii_uppercase[:n_index]), columns=err_cols + ) df_twiss[S] = np.linspace(0, n_valmax, n_index) df_errors[S] = df_twiss[S].copy() return df_twiss, df_errors @@ -299,13 +324,13 @@ def get_twiss_and_error_df(n_index, n_kmax, n_valmax): def phase_df(n, qx, qy): phx, phy = f"{PHASE_ADV}{X}", f"{PHASE_ADV}{Y}" df = pd.DataFrame(columns=[phx, phy]) - df[phx] = np.linspace(0, qx, n+1)[:n] - df[phy] = np.linspace(0, qy, n+1)[:n] + df[phx] = np.linspace(0, qx, n + 1)[:n] + df[phy] = np.linspace(0, qy, n + 1)[:n] return df def get_twiss_and_error_columns(max_n): - k_cols = [f"K{n}{s}L" for n in range(max_n) for s in ('S', '')] + k_cols = [f"K{n}{s}L" for n in range(max_n) for s in ("S", "")] twiss_cols = [X, Y] + k_cols err_cols = [f"{DELTA_ORBIT}{X}", f"{DELTA_ORBIT}{Y}"] + k_cols return twiss_cols, err_cols