Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ def about_package(init_posixpath: pathlib.Path) -> dict:
"pandas": ("https://pandas.pydata.org/pandas-docs/stable/", None),
"matplotlib": ("https://matplotlib.org/stable/", None),
"scipy": ("https://docs.scipy.org/doc/scipy/", None),
"tfs-pandas": ("https://pylhc.github.io/tfs/", None),
"tfs": ("https://pylhc.github.io/tfs/", None),
"generic_parser": ("https://pylhc.github.io/generic_parser/", None),
"omc3": ("https://pylhc.github.io/omc3/", None),
Expand Down
2 changes: 1 addition & 1 deletion pylhc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
__title__ = "pylhc"
__description__ = "An accelerator physics script collection for the OMC team at CERN."
__url__ = "https://github.com/pylhc/pylhc"
__version__ = "0.8.0"
__version__ = "0.8.1"
__author__ = "pylhc"
__author_email__ = "[email protected]"
__license__ = "MIT"
Expand Down
6 changes: 4 additions & 2 deletions pylhc/constants/kickgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

# Columns ---
KICKGROUP = "KICKGROUP"
JSON_FILE = "JSON"
SDDS = "SDDS"
FILL = "FILL"
TURNS = "TURNS"
BUNCH = "BUNCH"
UTCTIME = "UTC"
Expand All @@ -29,6 +31,6 @@
BEAMPROCESS = "BEAMPROCESS"
BEAM = "BEAM"

KICK_COLUMNS = [UTCTIME, LOCALTIME, TUNEX, TUNEY, DRIVEN_TUNEX, DRIVEN_TUNEY, DRIVEN_TUNEZ, AMPX, AMPY, AMPZ, TURNS, BUNCH, SDDS, BEAM, OPTICS, OPTICS_URI, BEAMPROCESS]
COLUMNS_TO_HEADERS = [BEAM, BUNCH, TURNS, BEAMPROCESS, OPTICS, OPTICS_URI]
KICK_COLUMNS = [UTCTIME, LOCALTIME, TUNEX, TUNEY, DRIVEN_TUNEX, DRIVEN_TUNEY, DRIVEN_TUNEZ, AMPX, AMPY, AMPZ, TURNS, BUNCH, SDDS, JSON_FILE, BEAM, FILL, OPTICS, OPTICS_URI, BEAMPROCESS]
COLUMNS_TO_HEADERS = [BEAM, FILL, BUNCH, TURNS, BEAMPROCESS, OPTICS, OPTICS_URI]
KICK_GROUP_COLUMNS = [UTCTIME, LOCALTIME, KICKGROUP, TIMESTAMP]
14 changes: 7 additions & 7 deletions pylhc/data_extract/lsa.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ def find_beamprocess_history(
return fills

def get_trim_history(
self, beamprocess: str, knobs: list,
start_time: AccDatetime = None, end_time: AccDatetime = None,
accelerator: str = "lhc"
self, beamprocess: str, knobs: list,
start_time: AccDatetime = None, end_time: AccDatetime = None,
accelerator: str = "lhc"
) -> dict:
"""
Get trim history for knobs between specified times.
Expand All @@ -187,10 +187,10 @@ def get_trim_history(
raise ValueError("None of the given knobs exist!")

if start_time is not None:
start_time = start_time.timestamp()
start_time = start_time.timestamp()

if end_time is not None:
end_time = end_time.timestamp()
end_time = end_time.timestamp()

LOG.debug(f"Getting trims for {len(knobs)} knobs.")
try:
Expand Down Expand Up @@ -259,7 +259,7 @@ def get_knob_circuits(self, knob_name: str, optics: str) -> tfs.TfsDataFrame:
"""
Get a dataframe of the structure of the knob. Similar to online model extractor
(KnobExtractor.getKnobHiercarchy)

Args:
knob_name: name of the knob.
optics: name of the optics.
Expand Down
121 changes: 100 additions & 21 deletions pylhc/kickgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@

from datetime import datetime
from pathlib import Path
from typing import List, Union

import numpy as np
import pandas as pd
Expand All @@ -83,6 +82,8 @@
DRIVEN_TUNEX,
DRIVEN_TUNEY,
DRIVEN_TUNEZ,
FILL,
JSON_FILE,
KICK_COLUMNS,
KICK_GROUP_COLUMNS,
KICKGROUP,
Expand All @@ -105,7 +106,7 @@
# List Kickgroups --------------------------------------------------------------


def list_available_kickgroups(by: str = TIMESTAMP, root: Union[Path, str] = KICKGROUPS_ROOT, printout: bool = True) -> DataFrame:
def list_available_kickgroups(by: str = TIMESTAMP, root: Path | str = KICKGROUPS_ROOT, printout: bool = True) -> DataFrame:
"""
List all available KickGroups in `root` with optional sorting..

Expand Down Expand Up @@ -139,11 +140,11 @@ def list_available_kickgroups(by: str = TIMESTAMP, root: Union[Path, str] = KICK
return df_info


def get_folder_json_files(root: Union[Path, str] = KICKGROUPS_ROOT) -> List[Path]:
def get_folder_json_files(root: Path | str = KICKGROUPS_ROOT) -> list[Path]:
"""Returns a list of all **.json** files in the folder.

Args:
root (Union[Path, str])): the path to the folder. (Defaults
root (Path | str)): the path to the folder. (Defaults
to the ``NFS`` path of our kickgroups).

Returns:
Expand All @@ -157,7 +158,7 @@ def get_folder_json_files(root: Union[Path, str] = KICKGROUPS_ROOT) -> List[Path
# Kickgroup Info ---------------------------------------------------------------


def get_kickgroup_info(kick_group: str, root: Union[Path, str] = KICKGROUPS_ROOT) -> TfsDataFrame:
def get_kickgroup_info(kick_group: str, root: Path | str = KICKGROUPS_ROOT) -> TfsDataFrame:
"""
Gather all important info about the KickGroup into a `~tfs.TfsDataFrame`.

Expand All @@ -176,7 +177,7 @@ def get_kickgroup_info(kick_group: str, root: Union[Path, str] = KICKGROUPS_ROOT
df_info = TfsDataFrame(index=range(len(kicks_files)), columns=KICK_COLUMNS, headers={KICKGROUP: kick_group})

if not len(kicks_files):
raise FileNotFoundError(f"KickGroup {kick_group} contains no kicks.")
raise ValueError(f"KickGroup {kick_group} contains no kicks.")

for idx, kf in enumerate(kicks_files):
df_info.loc[idx, :] = load_kickfile(kf)
Expand All @@ -187,24 +188,34 @@ def get_kickgroup_info(kick_group: str, root: Union[Path, str] = KICKGROUPS_ROOT
return df_info


def load_kickfile(kickfile: Union[Path, str]) -> pd.Series:
def load_kickfile(kickfile: Path | str) -> pd.Series:
"""
Load the important data from a **json** kickfile into a `~pandas.Series`.

Args:
kickfile (Union[Path, str]): the path to the kickfile to load data from.
kickfile (Path | str): the path to the kickfile to load data from.

Returns:
A `~pandas.Series` with the relevant information loaded from the provided
*kickfile*. The various entries in the Series are defined in `pylhc.constants.kickgroups`
as ``KICK_COLUMNS``.
"""
LOG.debug(f"Loading kick information from Kickfile at '{Path(kickfile).absolute()}'")
kickfile = _find_existing_file_path(kickfile)
kick = _load_json(kickfile)

data = pd.Series(index=KICK_COLUMNS, dtype=object)
data[JSON_FILE] = kickfile
data[LOCALTIME] = _jsontime_to_datetime(kick["acquisitionTime"])
data[UTCTIME] = _local_to_utc(data[LOCALTIME])
data[SDDS] = kick["sddsFile"]

try:
data[SDDS] = _find_existing_file_path(kick["sddsFile"])
except FileNotFoundError as e:
LOG.warning(str(e))
data[SDDS] = None

data[FILL] = _get_fill_from_path(kick["sddsFile"]) # TODO: Ask OP to include in json?
data[BEAM] = kick["measurementEnvironment"]["lhcBeam"]["beamName"]
data[BEAMPROCESS] = kick["measurementEnvironment"]["environmentContext"]["name"]
data[TURNS] = kick["acqSettings"]["capturedTurns"]
Expand All @@ -229,27 +240,85 @@ def load_kickfile(kickfile: Union[Path, str]) -> pd.Series:
data[AMPZ] = kick["excitationSettings"][0]["longitudinalRfSettings"]["excitationAmplitude"]
else:
LOG.debug("Kick is 2D Excitation, longitudinal settings will be set as NaNs")
idx = _get_plane_index(kick["excitationSettings"], "X")
idy = _get_plane_index(kick["excitationSettings"], "Y")
entry_map = {"X": (TUNEX, DRIVEN_TUNEX, AMPX), "Y": (TUNEY, DRIVEN_TUNEY, AMPY)}
for plane in ["X", "Y"]:
tune, driven_tune, amp = entry_map[plane]

data[tune] = np.NaN
data[driven_tune] = np.NaN
data[amp] = np.NaN

try:
idx = _get_plane_index(kick["excitationSettings"], plane)
except ValueError as e:
LOG.warning(f"{str(e)} in {kickfile}")
continue

if "measuredTune" not in kick["excitationSettings"][idx]: # Happens in very early files in 2022
LOG.warning(f"No measured tune {plane} in the kick file: {kickfile}")
continue

data[tune] = kick["excitationSettings"][idx]["measuredTune"]
data[driven_tune] = data[tune] + _get_delta_tune(kick, idx)
data[amp] = kick["excitationSettings"][idx]["amplitude"]

data[TUNEX] = kick["excitationSettings"][idx]["measuredTune"]
data[TUNEY] = kick["excitationSettings"][idy]["measuredTune"]
data[DRIVEN_TUNEX] = data[TUNEX] + kick["excitationSettings"][idx]["deltaTuneStart"]
data[DRIVEN_TUNEY] = data[TUNEY] + kick["excitationSettings"][idy]["deltaTuneStart"]
data[DRIVEN_TUNEZ] = np.NaN
data[AMPX] = kick["excitationSettings"][idx]["amplitude"]
data[AMPY] = kick["excitationSettings"][idy]["amplitude"]
data[AMPZ] = np.NaN

return data

def _get_delta_tune(kick: dict, idx_plane: int) -> float:
""" Return the delta from the tune for the kicks.
For some reason, there are multiple different keys where this can be stored. """

# Default key for ACDipole ---
# There is also "deltaTuneEnd", but we usually don't change the delta during kick
try:
return kick["excitationSettings"][idx_plane]["deltaTuneStart"]
except KeyError:
pass

# Key for ADTACDipole ---
try:
return kick["excitationSettings"][idx_plane]["deltaTune"]
except KeyError:
pass

# Another key for ADTACDipole (unclear to me why) ---
try:
return kick["excitationSettings"][idx_plane]["deltaTuneOffset"]
except KeyError:
pass

raise KeyError(f"Could not find delta tune for plane-entry {idx_plane}")


def _find_existing_file_path(path: str|Path) -> Path:
""" Find the existing kick file for the kick group. """
path = Path(path)
if path.is_file():
return path

fill_data = "FILL_DATA"
all_fill_data = "ALL_FILL_DATA"

if fill_data in path.parts:
# Fills are moved at the end of year
idx = path.parts.index(fill_data)+1
new_path = Path(*path.parts[:idx], all_fill_data, *path.parts[idx:])
if new_path.exists():
return new_path

raise FileNotFoundError(f"Could not find kick file at {path}")



# Functions with console output ---

# Full Info -


def show_kickgroup_info(kick_group: str, root: Union[Path, str] = KICKGROUPS_ROOT) -> None:
def show_kickgroup_info(kick_group: str, root: Path | str = KICKGROUPS_ROOT) -> None:
"""
Wrapper around `~pylhc.kickgroups.get_kickgroup_info`, gathering the relevant
information from the kick files in the group and printing it to console.
Expand Down Expand Up @@ -284,7 +353,7 @@ def _print_kickgroup_info(kicks_info: TfsDataFrame) -> None:
# Files only -


def show_kickgroup_files(kick_group: str, nfiles: int = None, root: Union[Path, str] = KICKGROUPS_ROOT) -> None:
def show_kickgroup_files(kick_group: str, nfiles: int = None, root: Path | str = KICKGROUPS_ROOT) -> None:
"""
Wrapper around `pylhc.kickgroups.get_kickgroup_info`, gathering the relevant
information from all kickfiles in the KickGroup and printing only the sdds-filepaths
Expand Down Expand Up @@ -337,7 +406,7 @@ def _print_kickgroup_files(kicks_info: TfsDataFrame, nfiles: int = None) -> None
# IO ---


def _load_json(jsonfile: Union[Path, str]) -> dict:
def _load_json(jsonfile: Path | str) -> dict:
return json.loads(Path(jsonfile).read_text())


Expand Down Expand Up @@ -371,7 +440,7 @@ def _local_to_utc(dt: datetime):
# Other ---


def _get_plane_index(data: List[dict], plane: str) -> str:
def _get_plane_index(data: list[dict], plane: str) -> str:
"""
Find the index for the given plane in the data list.
This is necessary as they are not always in X,Y order.
Expand All @@ -384,6 +453,16 @@ def _get_plane_index(data: List[dict], plane: str) -> str:
raise ValueError(f"Plane '{plane}' not found in data.")


def _get_fill_from_path(sdds_path: str | Path) -> str:
""" Get the fill number from the path to the sdds file.
Note: Not sure why the fill is not saved automatically into the .json file.
Maybe we should ask OP to include this.
"""
parts = Path(sdds_path).parts
idx_parent = parts.index("FILL_DATA")
return int(parts[idx_parent + 1])


# Script Mode ------------------------------------------------------------------


Expand Down