Skip to content
Merged
8 changes: 4 additions & 4 deletions doc/conf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# optics_functions documentation build configuration file, created by
# sphinx-quickstart on Tue Feb 6 12:10:18 2018.
Expand All @@ -17,6 +16,7 @@
import pathlib
import sys
import warnings

warnings.filterwarnings("ignore", message="numpy.dtype size changed")
warnings.filterwarnings("ignore", message="numpy.ufunc size changed")

Expand Down Expand Up @@ -183,15 +183,15 @@ def about_package(init_posixpath: pathlib.Path) -> dict:
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, "optics_functions.tex", u"Optics Functions Documentation", u"OMC-TEAM", "manual"),
(master_doc, "optics_functions.tex", "Optics Functions Documentation", "OMC-TEAM", "manual"),
]


# -- Options for manual page output ---------------------------------------

# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [(master_doc, "optics_functions", u"Optics Functions Documentation", [author], 1)]
man_pages = [(master_doc, "optics_functions", "Optics Functions Documentation", [author], 1)]


# -- Options for Texinfo output -------------------------------------------
Expand All @@ -203,7 +203,7 @@ def about_package(init_posixpath: pathlib.Path) -> dict:
(
master_doc,
"optics_functions",
u"Optics Functions Documentation",
"Optics Functions Documentation",
author,
"OMC-TEAM",
"One line description of project.",
Expand Down
5 changes: 3 additions & 2 deletions optics_functions/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

Constants for the optics functions.
"""

import numpy as np

PI = np.pi
PI2 = 2 * np.pi
PI2I = 2j * np.pi
PLANES = ("X", "Y")
PLANE_TO_NUM = dict(X=1, Y=2)
PLANE_TO_HV = dict(X="H", Y="V")
PLANE_TO_NUM = {"X": 1, "Y": 2}
PLANE_TO_HV = {"X": "H", "Y": "V"}

# Columns & Headers ------------------------------------------------------------
NAME = "NAME"
Expand Down
75 changes: 52 additions & 23 deletions optics_functions/coupling.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,33 @@
Functions to estimate coupling from twiss dataframes and different methods to calculate the closest tune
approach from the calculated coupling RDTs.
"""

import logging
from collections.abc import Sequence
from contextlib import suppress
from typing import Sequence, Tuple

import numpy as np
from pandas import DataFrame, Series
from tfs import TfsDataFrame

from optics_functions.constants import (ALPHA, BETA, GAMMA, X, Y, TUNE, DELTA,
MINIMUM, PI2, PHASE_ADV, S, LENGTH,
IMAG, REAL, F1010, F1001)
from optics_functions.constants import (
ALPHA,
BETA,
DELTA,
F1001,
F1010,
GAMMA,
IMAG,
LENGTH,
MINIMUM,
PHASE_ADV,
PI2,
REAL,
TUNE,
S,
X,
Y,
)
from optics_functions.rdt import calculate_rdts
from optics_functions.utils import split_complex_columns, timeit

Expand Down Expand Up @@ -59,8 +75,11 @@ def coupling_via_rdts(df: TfsDataFrame, complex_columns: bool = True, **kwargs)
return df_res


def coupling_via_cmatrix(df: DataFrame, complex_columns: bool = True,
output: Sequence[str] = ("rdts", "gamma", "cmatrix")) -> DataFrame:
def coupling_via_cmatrix(
df: DataFrame,
complex_columns: bool = True,
output: Sequence[str] = ("rdts", "gamma", "cmatrix"),
) -> DataFrame:
"""Calculates C matrix then Coupling and Gamma from it.
See [CalagaBetatronCoupling2005]_ .

Expand Down Expand Up @@ -173,10 +192,14 @@ def rmatrix_from_coupling(df: DataFrame, complex_columns: bool = True) -> DataFr

# Eq. (15)
if complex_columns:
abs_squared_diff = df[F1001].abs()**2 - df[F1010].abs()**2
abs_squared_diff = df[F1001].abs() ** 2 - df[F1010].abs() ** 2
else:
abs_squared_diff = (df[f"{F1001}{REAL}"]**2 + df[f"{F1001}{IMAG}"]**2 -
df[f"{F1010}{REAL}"]**2 - df[f"{F1010}{IMAG}"]**2)
abs_squared_diff = (
df[f"{F1001}{REAL}"] ** 2
+ df[f"{F1001}{IMAG}"] ** 2
- df[f"{F1010}{REAL}"] ** 2
- df[f"{F1010}{IMAG}"] ** 2
)

gamma = np.sqrt(1.0 / (1.0 + 4.0 * abs_squared_diff))

Expand Down Expand Up @@ -209,6 +232,7 @@ def rmatrix_from_coupling(df: DataFrame, complex_columns: bool = True) -> DataFr

# Closest Tune Approach --------------------------------------------------------


def closest_tune_approach(
df: TfsDataFrame, qx: float = None, qy: float = None, method: str = "teapot"
) -> TfsDataFrame:
Expand Down Expand Up @@ -249,7 +273,9 @@ def closest_tune_approach(
of the mean of this column.
"""
if F1001 not in df.columns:
raise KeyError(f"'{F1001}' column not in dataframe. Needed to calculated closest tune approach.")
raise KeyError(
f"'{F1001}' column not in dataframe. Needed to calculated closest tune approach."
)

method_map = {
"teapot": _cta_teapot, # as named in [HoydalsvikEvaluationOfTheClosestTuneApproach2021]_
Expand Down Expand Up @@ -279,7 +305,7 @@ def closest_tune_approach(


def _cta_franchi(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series:
""" Closest tune approach calculated by Eq. (1) in [PerssonImprovedControlCoupling2014]_ . """
"""Closest tune approach calculated by Eq. (1) in [PerssonImprovedControlCoupling2014]_ ."""
return 4 * (qx_frac - qy_frac) * df[F1001].abs()


Expand All @@ -294,21 +320,21 @@ def _cta_persson_alt(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series


def _cta_persson(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series:
""" Closest tune approach calculated by Eq. (2) in [PerssonImprovedControlCoupling2014]_ . """
"""Closest tune approach calculated by Eq. (2) in [PerssonImprovedControlCoupling2014]_ ."""
deltaq = qx_frac - qy_frac # fractional tune split
location_term = np.exp(1j * PI2 * (deltaq * df[S] / (df.headers[LENGTH] / PI2)))
return _cta_persson_alt(df, qx_frac, qy_frac) * location_term


def _cta_hoydalsvik(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series:
""" Closest tune approach calculated by Eq. (14) in
"""Closest tune approach calculated by Eq. (14) in
[HoydalsvikEvaluationOfTheClosestTuneApproach2021]_ .
This is like the persson estimate but divided by 1 + 4|F1001|^2 ."""
return _cta_persson(df, qx_frac, qy_frac) / (1 + 4 * df[F1001].abs() ** 2)


def _cta_hoydalsvik_alt(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series:
""" Closest tune approach calculated by Eq. (14) without the s-term in
"""Closest tune approach calculated by Eq. (14) without the s-term in
[HoydalsvikEvaluationOfTheClosestTuneApproach2021]_ .
This is like the persson_alt estimate but divided by 1 + 4|F1001|^2 ."""
return _cta_persson_alt(df, qx_frac, qy_frac) / (1 + 4 * df[F1001].abs() ** 2)
Expand All @@ -332,20 +358,19 @@ def _cta_teapot(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series:
def _cta_teapot_franchi(df: TfsDataFrame, qx_frac: float, qy_frac: float) -> Series:
"""Closest tune approach calculated by Eq. (12) in
[HoydalsvikEvaluationOfTheClosestTuneApproach2021]_ .
This is the teapot approach with the Franchi approximation. """
This is the teapot approach with the Franchi approximation."""
return 4 * (qx_frac - qy_frac) * df[F1001].abs() / (1 + 4 * df[F1001].abs() ** 2)


def _get_weights_from_lengths(df: TfsDataFrame) -> Tuple[float, np.array]:
"""Coefficients for the `persson` method. """
def _get_weights_from_lengths(df: TfsDataFrame) -> tuple[float, np.ndarray]:
"""Coefficients for the `persson` method."""
# approximate length of each element (ds in integral)
s_periodic = np.zeros(len(df) + 1)
s_periodic[1:] = df[S].to_numpy()
s_periodic[0] = df[S].to_numpy()[-1] - df.headers[LENGTH]

# weight ds/(2*pi*R) * N (as we take the mean afterwards)
weights = np.diff(s_periodic) / df.headers[LENGTH] * len(df)
return weights
return np.diff(s_periodic) / df.headers[LENGTH] * len(df)


def check_resonance_relation(df: DataFrame, to_nan: bool = False) -> DataFrame:
Expand All @@ -368,11 +393,15 @@ def check_resonance_relation(df: DataFrame, to_nan: bool = False) -> DataFrame:
LOG.debug("Sum-resonance not in df, skipping resonance relation check.")
return df

condition_not_fulfilled = df[F1001].abs() < df[F1010].abs() # comparison with NaN always yields False
condition_not_fulfilled = (
df[F1001].abs() < df[F1010].abs()
) # comparison with NaN always yields False
if any(condition_not_fulfilled):
LOG.warning(f"In {sum(condition_not_fulfilled) / len(df.index) * 100}% "
"of the data points |F1001| < |F1010|. Your closest tune "
"approach estimates might not be accurate.")
LOG.warning(
f"In {sum(condition_not_fulfilled) / len(df.index) * 100}% "
"of the data points |F1001| < |F1010|. Your closest tune "
"approach estimates might not be accurate."
)
if to_nan:
df.loc[condition_not_fulfilled, COUPLING_RDTS] = np.nan
return df
Loading