Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions tests/test_sps.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,56 @@ def test_write_read(tmp_path):
)
tmp_sdds = tmp_path / "sps_fake_data.sdds"
# Normal read/write test
sps.write_tbt(tmp_sdds, original, add_trailing_bpm_plane=False)
sps.write_tbt(tmp_sdds, original, add_trailing_bpm_plane=False)
read_sdds = sps.read_tbt(tmp_sdds, remove_trailing_bpm_plane=False)
compare_tbt(original, read_sdds, no_binary=True)

# Test no name changes when writing and planes already present
sps.write_tbt(tmp_sdds, original, add_trailing_bpm_plane=True)
sps.write_tbt(tmp_sdds, original, add_trailing_bpm_plane=True)
read_sdds = sps.read_tbt(tmp_sdds, remove_trailing_bpm_plane=False)
compare_tbt(original, read_sdds, no_binary=True)

# Test plane removal on reading
read_sdds = sps.read_tbt(tmp_sdds, remove_trailing_bpm_plane=True)
assert not any(read_sdds.matrices[0].X.index.str.endswith(".H"))
assert not any(read_sdds.matrices[0].Y.index.str.endswith(".V"))

# Test planes stay off when writing
sps.write_tbt(tmp_sdds, read_sdds, add_trailing_bpm_plane=False)
read_sdds = sps.read_tbt(tmp_sdds, remove_trailing_bpm_plane=False)
assert not any(read_sdds.matrices[0].X.index.str.endswith(".H"))
assert not any(read_sdds.matrices[0].Y.index.str.endswith(".V"))

# Test adding planes again
sps.write_tbt(tmp_sdds, read_sdds, add_trailing_bpm_plane=True)
sps.write_tbt(tmp_sdds, read_sdds, add_trailing_bpm_plane=True)
read_sdds = sps.read_tbt(tmp_sdds, remove_trailing_bpm_plane=False)
compare_tbt(original, read_sdds, no_binary=True)


def test_split_function():
""" Tests that the splitting function into planes works as expected. """

names_with_planes = ("bpm1.H", "bpm2.V", "bpm3.V", "bpm4.H")
x, y = sps._split_bpm_names_to_planes(names_with_planes)

assert all([bpm in x for bpm in ("bpm1.H", "bpm4.H")])
assert all([bpm in y for bpm in ("bpm2.V", "bpm3.V")])

names_without_planes = ("bpm1", "bpm2", "bpm3", "bpm4")
planes = (0, 1, 1, 0)
x, y = sps._split_bpm_names_to_planes(names_without_planes, planes)

for bpm, plane in zip(names_without_planes, planes):
assert bpm in x if plane == 0 else bpm in y
assert bpm not in x if plane == 1 else bpm not in y

planes = (1, 3, 1, 3)
x, y = sps._split_bpm_names_to_planes(names_without_planes, planes)

for bpm, plane in zip(names_without_planes, planes):
assert bpm in x if plane == 1 else bpm in y
assert bpm not in x if plane == 3 else bpm not in y


@pytest.fixture()
def _sps_file() -> Path:
Expand Down
2 changes: 1 addition & 1 deletion turn_by_turn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__title__ = "turn_by_turn"
__description__ = "Read and write turn-by-turn measurement files from different particle accelerator formats."
__url__ = "https://github.com/pylhc/turn_by_turn"
__version__ = "0.9.0"
__version__ = "0.9.1"
__author__ = "pylhc"
__author_email__ = "[email protected]"
__license__ = "MIT"
Expand Down
77 changes: 64 additions & 13 deletions turn_by_turn/sps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

Data handling for turn-by-turn measurement files from the ``SPS`` (files in **SDDS** format).
"""
from __future__ import annotations

import logging
from datetime import datetime
from pathlib import Path
import re
from typing import Union
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd
Expand All @@ -18,24 +20,28 @@
from turn_by_turn.ascii import is_ascii_file, read_tbt as read_ascii
from turn_by_turn.structures import TbtData, TransverseData

if TYPE_CHECKING:
from collections.abc import Sequence

LOGGER = logging.getLogger(__name__)


# IDs
N_TURNS: str = "nbOfTurns"
TIMESTAMP: str = "timestamp"
BPM_NAMES: str = "MonNames"
BPM_PLANES: str = "MonPlanes"


def read_tbt(file_path: Union[str, Path], remove_trailing_bpm_plane: bool = True) -> TbtData:
def read_tbt(file_path: str | Path, remove_trailing_bpm_plane: bool = True) -> TbtData:
"""
Reads turn-by-turn data from the ``SPS``'s **SDDS** format file.
Will first determine if it is in ASCII format to figure out which reading method to use.

Args:
file_path (Union[str, Path]): path to the turn-by-turn measurement file.
file_path (str | Path): path to the turn-by-turn measurement file.
remove_trailing_bpm_plane (bool, optional): if ``True``, will remove the trailing
BPM plane ('.H', '.V') from the BPM-names.
BPM plane ('.H', '.V') from the BPM-names.
This makes the measurement data compatible with the madx-models.
Defaults to ``True``.

Expand All @@ -51,16 +57,16 @@ def read_tbt(file_path: Union[str, Path], remove_trailing_bpm_plane: bool = True
sdds_file = sdds.read(file_path)

nturns = sdds_file.values[N_TURNS]
date = datetime.fromtimestamp(sdds_file.values[TIMESTAMP] / 1e9, tz=tz.tzutc())
bpm_names = np.array(sdds_file.values[BPM_NAMES])
bpm_planes = np.array(sdds_file.values[BPM_PLANES]).astype(bool)
date = datetime.fromtimestamp(sdds_file.values[TIMESTAMP] / 1e9, tz=tz.tzutc())

bpm_names_y = bpm_names[bpm_planes]
bpm_names_x = bpm_names[~bpm_planes]
bpm_names_x, bpm_names_y = _split_bpm_names_to_planes(
sdds_file.values[BPM_NAMES],
sdds_file.values[BPM_PLANES]
)

tbt_data_x = [sdds_file.values[bpm] for bpm in bpm_names_x]
tbt_data_y = [sdds_file.values[bpm] for bpm in bpm_names_y]

if remove_trailing_bpm_plane:
pattern = re.compile(r"\.[HV]$", flags=re.IGNORECASE)
bpm_names_x = [pattern.sub("", bpm) for bpm in bpm_names_x]
Expand All @@ -76,17 +82,62 @@ def read_tbt(file_path: Union[str, Path], remove_trailing_bpm_plane: bool = True
return TbtData(matrices, date, [0], nturns)


def write_tbt(output_path: Union[str, Path], tbt_data: TbtData, add_trailing_bpm_plane: bool = True) -> None:
def _split_bpm_names_to_planes(bpm_names: Sequence[str], bpm_planes: Sequence[int] = ()) -> tuple[np.ndarray, np.ndarray]:
""" Splits BPM names into X and Y planes.

In the past, this was done by using the ``MonPlanes`` array, but in 2025 the SPS output changed from using
`1` for vertical and `0` for horizontal to `3` for vertical and `1` for horizontal.
It is therefore probably better to split based on the naming of the BPMs.
Comment on lines +88 to +90
Copy link
Member

@fsoubelet fsoubelet Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanksssss


Args:
bpm_names (Sequence[str]): BPM names
bpm_planes (Sequence[int], optional): BPM planes

Returns:
tuple[np.ndarray, np.ndarray]: BPM names for X and Y planes
"""
bpm_names = pd.Series(bpm_names)

if bpm_names.str.match(r".+\.[HV]$", flags=re.IGNORECASE).all():
# all names end in .V or .H -> split by name
vertical_bpms = bpm_names.str.endswith(".V")
else:
LOGGER.warning(
"Could not determine BPM planes from BPM names. "
"Splitting by the 'MonPlanes' array, which might be subject to changes."
)
if 3 in bpm_planes: # 2025 format splitting: 3 for vertical, 1 for horizontal
vertical_bpms = np.array(bpm_planes) == 3

elif 0 in bpm_planes: # pre-2025 format splitting: 1 for vertical, 0 for horizontal
vertical_bpms = np.array(bpm_planes).astype(bool)

else:
msg = "Could not determine SPS file format to split BPMs into planes."
raise ValueError(msg)

bpm_names_y = bpm_names[vertical_bpms].to_numpy()
bpm_names_x = bpm_names[~vertical_bpms].to_numpy()

return bpm_names_x, bpm_names_y


def write_tbt(output_path: str | Path, tbt_data: TbtData, add_trailing_bpm_plane: bool = True) -> None:
"""
Write a ``TbtData`` object's data to file, in a ``SPS``'s **SDDS** format.
The format is reduced to the necessary parameters used by the reader.
The format is reduced to the minimum parameters used by the reader.

WARNING: This writer uses ``0`` for horizontal and ``1`` for vertical BPMs
in the ``MonPlanes`` array, i.e. the pre-2025 format.

Args:
output_path (Union[str, Path]): path to a the disk location where to write the data.
output_path (str | Path): path to a the disk location where to write the data.
tbt_data (TbtData): the ``TbtData`` object to write to disk.
add_trailing_bpm_plane (bool, optional): if ``True``, will add the trailing
BPM plane ('.H', '.V') to the BPM-names. This assures that all BPM-names are unique,
and that the measurement data is compatible with the sdds files from the FESA-class.
WARNING: If present, these will be used to determine the plane of the BPMs,
otherwise the ``MonPlanes`` array will be used.
Defaults to ``True``.
"""
output_path = Path(output_path)
Expand Down