Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ def about_package(init_posixpath: pathlib.Path) -> dict:
"pandas": ("https://pandas.pydata.org/pandas-docs/stable/", None),
"matplotlib": ("https://matplotlib.org/stable/", None),
"scipy": ("https://docs.scipy.org/doc/scipy/", None),
"tfs-pandas": ("https://pylhc.github.io/tfs/", None),
"tfs": ("https://pylhc.github.io/tfs/", None),
"generic_parser": ("https://pylhc.github.io/generic_parser/", None),
"omc3": ("https://pylhc.github.io/omc3/", None),
Expand Down
2 changes: 1 addition & 1 deletion pylhc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
__title__ = "pylhc"
__description__ = "An accelerator physics script collection for the OMC team at CERN."
__url__ = "https://github.com/pylhc/pylhc"
__version__ = "0.8.0"
__version__ = "0.8.1"
__author__ = "pylhc"
__author_email__ = "[email protected]"
__license__ = "MIT"
Expand Down
6 changes: 4 additions & 2 deletions pylhc/constants/kickgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

# Columns ---
KICKGROUP = "KICKGROUP"
JSON_FILE = "JSON"
SDDS = "SDDS"
FILL = "FILL"
TURNS = "TURNS"
BUNCH = "BUNCH"
UTCTIME = "UTC"
Expand All @@ -29,6 +31,6 @@
BEAMPROCESS = "BEAMPROCESS"
BEAM = "BEAM"

KICK_COLUMNS = [UTCTIME, LOCALTIME, TUNEX, TUNEY, DRIVEN_TUNEX, DRIVEN_TUNEY, DRIVEN_TUNEZ, AMPX, AMPY, AMPZ, TURNS, BUNCH, SDDS, BEAM, OPTICS, OPTICS_URI, BEAMPROCESS]
COLUMNS_TO_HEADERS = [BEAM, BUNCH, TURNS, BEAMPROCESS, OPTICS, OPTICS_URI]
KICK_COLUMNS = [UTCTIME, LOCALTIME, TUNEX, TUNEY, DRIVEN_TUNEX, DRIVEN_TUNEY, DRIVEN_TUNEZ, AMPX, AMPY, AMPZ, TURNS, BUNCH, SDDS, JSON_FILE, BEAM, FILL, OPTICS, OPTICS_URI, BEAMPROCESS]
COLUMNS_TO_HEADERS = [BEAM, FILL, BUNCH, TURNS, BEAMPROCESS, OPTICS, OPTICS_URI]
KICK_GROUP_COLUMNS = [UTCTIME, LOCALTIME, KICKGROUP, TIMESTAMP]
14 changes: 7 additions & 7 deletions pylhc/data_extract/lsa.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ def find_beamprocess_history(
return fills

def get_trim_history(
self, beamprocess: str, knobs: list,
start_time: AccDatetime = None, end_time: AccDatetime = None,
accelerator: str = "lhc"
self, beamprocess: str, knobs: list,
start_time: AccDatetime = None, end_time: AccDatetime = None,
accelerator: str = "lhc"
) -> dict:
"""
Get trim history for knobs between specified times.
Expand All @@ -187,10 +187,10 @@ def get_trim_history(
raise ValueError("None of the given knobs exist!")

if start_time is not None:
start_time = start_time.timestamp()
start_time = start_time.timestamp()

if end_time is not None:
end_time = end_time.timestamp()
end_time = end_time.timestamp()

LOG.debug(f"Getting trims for {len(knobs)} knobs.")
try:
Expand Down Expand Up @@ -259,7 +259,7 @@ def get_knob_circuits(self, knob_name: str, optics: str) -> tfs.TfsDataFrame:
"""
Get a dataframe of the structure of the knob. Similar to online model extractor
(KnobExtractor.getKnobHiercarchy)

Args:
knob_name: name of the knob.
optics: name of the optics.
Expand Down
121 changes: 100 additions & 21 deletions pylhc/kickgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@

from datetime import datetime
from pathlib import Path
from typing import List, Union

import numpy as np
import pandas as pd
Expand All @@ -83,6 +82,8 @@
DRIVEN_TUNEX,
DRIVEN_TUNEY,
DRIVEN_TUNEZ,
FILL,
JSON_FILE,
KICK_COLUMNS,
KICK_GROUP_COLUMNS,
KICKGROUP,
Expand All @@ -105,7 +106,7 @@
# List Kickgroups --------------------------------------------------------------


def list_available_kickgroups(by: str = TIMESTAMP, root: Union[Path, str] = KICKGROUPS_ROOT, printout: bool = True) -> DataFrame:
def list_available_kickgroups(by: str = TIMESTAMP, root: Path | str = KICKGROUPS_ROOT, printout: bool = True) -> DataFrame:
"""
List all available KickGroups in `root` with optional sorting..

Expand Down Expand Up @@ -139,11 +140,11 @@ def list_available_kickgroups(by: str = TIMESTAMP, root: Union[Path, str] = KICK
return df_info


def get_folder_json_files(root: Union[Path, str] = KICKGROUPS_ROOT) -> List[Path]:
def get_folder_json_files(root: Path | str = KICKGROUPS_ROOT) -> list[Path]:
"""Returns a list of all **.json** files in the folder.

Args:
root (Union[Path, str])): the path to the folder. (Defaults
root (Path | str)): the path to the folder. (Defaults
to the ``NFS`` path of our kickgroups).

Returns:
Expand All @@ -157,7 +158,7 @@ def get_folder_json_files(root: Union[Path, str] = KICKGROUPS_ROOT) -> List[Path
# Kickgroup Info ---------------------------------------------------------------


def get_kickgroup_info(kick_group: str, root: Union[Path, str] = KICKGROUPS_ROOT) -> TfsDataFrame:
def get_kickgroup_info(kick_group: str, root: Path | str = KICKGROUPS_ROOT) -> TfsDataFrame:
"""
Gather all important info about the KickGroup into a `~tfs.TfsDataFrame`.

Expand All @@ -176,7 +177,7 @@ def get_kickgroup_info(kick_group: str, root: Union[Path, str] = KICKGROUPS_ROOT
df_info = TfsDataFrame(index=range(len(kicks_files)), columns=KICK_COLUMNS, headers={KICKGROUP: kick_group})

if not len(kicks_files):
raise FileNotFoundError(f"KickGroup {kick_group} contains no kicks.")
raise ValueError(f"KickGroup {kick_group} contains no kicks.")

for idx, kf in enumerate(kicks_files):
df_info.loc[idx, :] = load_kickfile(kf)
Expand All @@ -187,24 +188,34 @@ def get_kickgroup_info(kick_group: str, root: Union[Path, str] = KICKGROUPS_ROOT
return df_info


def load_kickfile(kickfile: Union[Path, str]) -> pd.Series:
def load_kickfile(kickfile: Path | str) -> pd.Series:
"""
Load the important data from a **json** kickfile into a `~pandas.Series`.

Args:
kickfile (Union[Path, str]): the path to the kickfile to load data from.
kickfile (Path | str): the path to the kickfile to load data from.

Returns:
A `~pandas.Series` with the relevant information loaded from the provided
*kickfile*. The various entries in the Series are defined in `pylhc.constants.kickgroups`
as ``KICK_COLUMNS``.
"""
LOG.debug(f"Loading kick information from Kickfile at '{Path(kickfile).absolute()}'")
kickfile = _find_existing_file_path(kickfile)
kick = _load_json(kickfile)

data = pd.Series(index=KICK_COLUMNS, dtype=object)
data[JSON_FILE] = _find_existing_file_path(kickfile)
data[LOCALTIME] = _jsontime_to_datetime(kick["acquisitionTime"])
data[UTCTIME] = _local_to_utc(data[LOCALTIME])
data[SDDS] = kick["sddsFile"]

try:
data[SDDS] = _find_existing_file_path(kick["sddsFile"])
except FileNotFoundError as e:
LOG.warning(str(e))
data[SDDS] = None

data[FILL] = _get_fill_from_path(kick["sddsFile"]) # TODO: Ask OP to include in json?
data[BEAM] = kick["measurementEnvironment"]["lhcBeam"]["beamName"]
data[BEAMPROCESS] = kick["measurementEnvironment"]["environmentContext"]["name"]
data[TURNS] = kick["acqSettings"]["capturedTurns"]
Expand All @@ -229,27 +240,85 @@ def load_kickfile(kickfile: Union[Path, str]) -> pd.Series:
data[AMPZ] = kick["excitationSettings"][0]["longitudinalRfSettings"]["excitationAmplitude"]
else:
LOG.debug("Kick is 2D Excitation, longitudinal settings will be set as NaNs")
idx = _get_plane_index(kick["excitationSettings"], "X")
idy = _get_plane_index(kick["excitationSettings"], "Y")
entry_map = {"X": (TUNEX, DRIVEN_TUNEX, AMPX), "Y": (TUNEY, DRIVEN_TUNEY, AMPY)}
for plane in ["X", "Y"]:
tune, driven_tune, amp = entry_map[plane]

data[tune] = np.NaN
data[driven_tune] = np.NaN
data[amp] = np.NaN

try:
idx = _get_plane_index(kick["excitationSettings"], plane)
except ValueError as e:
LOG.warning(f"{str(e)} in {kickfile}")
continue

if "measuredTune" not in kick["excitationSettings"][idx]: # Happens in very early files in 2022
LOG.warning(f"No measured tune {plane} in the kick file: {kickfile}")
continue

data[tune] = kick["excitationSettings"][idx]["measuredTune"]
data[driven_tune] = data[tune] + _get_delta_tune(kick, idx)
data[amp] = kick["excitationSettings"][idx]["amplitude"]

data[TUNEX] = kick["excitationSettings"][idx]["measuredTune"]
data[TUNEY] = kick["excitationSettings"][idy]["measuredTune"]
data[DRIVEN_TUNEX] = data[TUNEX] + kick["excitationSettings"][idx]["deltaTuneStart"]
data[DRIVEN_TUNEY] = data[TUNEY] + kick["excitationSettings"][idy]["deltaTuneStart"]
data[DRIVEN_TUNEZ] = np.NaN
data[AMPX] = kick["excitationSettings"][idx]["amplitude"]
data[AMPY] = kick["excitationSettings"][idy]["amplitude"]
data[AMPZ] = np.NaN

return data

def _get_delta_tune(kick: dict, idx_plane: int) -> float:
""" Return the delta from the tune for the kicks.
For some reason, there are multiple different keys where this can be stored. """

# Default key for ACDipole ---
# There is also "deltaTuneEnd", but we usually don't change the delta during kick
try:
return kick["excitationSettings"][idx_plane]["deltaTuneStart"]
except KeyError:
pass

# Key for ADTACDipole ---
try:
return kick["excitationSettings"][idx_plane]["deltaTune"]
except KeyError:
pass

# Another key for ADTACDipole (unclear to me why) ---
try:
return kick["excitationSettings"][idx_plane]["deltaTuneOffset"]
except KeyError:
pass

raise KeyError(f"Could not find delta tune for plane-entry {idx_plane}")


def _find_existing_file_path(path: str|Path) -> Path:
""" Find the existing kick file for the kick group. """
path = Path(path)
if path.is_file():
return path

fill_data = "FILL_DATA"
all_fill_data = "ALL_FILL_DATA"

if fill_data in path.parts:
# Fills are moved at the end of year
idx = path.parts.index(fill_data)+1
new_path = Path(*path.parts[:idx], all_fill_data, *path.parts[idx:])
if new_path.exists():
return new_path

raise FileNotFoundError(f"Could not find kick file at {path}")



# Functions with console output ---

# Full Info -


def show_kickgroup_info(kick_group: str, root: Union[Path, str] = KICKGROUPS_ROOT) -> None:
def show_kickgroup_info(kick_group: str, root: Path | str = KICKGROUPS_ROOT) -> None:
"""
Wrapper around `~pylhc.kickgroups.get_kickgroup_info`, gathering the relevant
information from the kick files in the group and printing it to console.
Expand Down Expand Up @@ -284,7 +353,7 @@ def _print_kickgroup_info(kicks_info: TfsDataFrame) -> None:
# Files only -


def show_kickgroup_files(kick_group: str, nfiles: int = None, root: Union[Path, str] = KICKGROUPS_ROOT) -> None:
def show_kickgroup_files(kick_group: str, nfiles: int = None, root: Path | str = KICKGROUPS_ROOT) -> None:
"""
Wrapper around `pylhc.kickgroups.get_kickgroup_info`, gathering the relevant
information from all kickfiles in the KickGroup and printing only the sdds-filepaths
Expand Down Expand Up @@ -337,7 +406,7 @@ def _print_kickgroup_files(kicks_info: TfsDataFrame, nfiles: int = None) -> None
# IO ---


def _load_json(jsonfile: Union[Path, str]) -> dict:
def _load_json(jsonfile: Path | str) -> dict:
return json.loads(Path(jsonfile).read_text())


Expand Down Expand Up @@ -371,7 +440,7 @@ def _local_to_utc(dt: datetime):
# Other ---


def _get_plane_index(data: List[dict], plane: str) -> str:
def _get_plane_index(data: list[dict], plane: str) -> str:
"""
Find the index for the given plane in the data list.
This is necessary as they are not always in X,Y order.
Expand All @@ -384,6 +453,16 @@ def _get_plane_index(data: List[dict], plane: str) -> str:
raise ValueError(f"Plane '{plane}' not found in data.")


def _get_fill_from_path(sdds_path: str | Path) -> str:
""" Get the fill number from the path to the sdds file.
Note: Not sure why the fill is not saved automatically into the .json file.
Maybe we should ask OP to include this.
"""
parts = Path(sdds_path).parts
idx_parent = parts.index("FILL_DATA")
return int(parts[idx_parent + 1])


# Script Mode ------------------------------------------------------------------


Expand Down