|
| 1 | +""" |
| 2 | +MAD-NG |
| 3 | +------ |
| 4 | +
|
| 5 | +This module provides functions to read and write ``MAD-NG`` turn-by-turn measurement files. These files |
| 6 | +are in the **TFS** format. |
| 7 | +
|
| 8 | +""" |
| 9 | + |
| 10 | +from __future__ import annotations |
| 11 | + |
| 12 | +import logging |
| 13 | + |
| 14 | +import pandas as pd |
| 15 | +import tfs |
| 16 | + |
| 17 | +from turn_by_turn.structures import TbtData, TransverseData |
| 18 | + |
| 19 | +LOGGER = logging.getLogger() |
| 20 | + |
| 21 | + |
| 22 | +# def read_tbt(file_path: str | Path) -> TbtData: |
| 23 | +def read_tbt(df: tfs.TfsDataFrame) -> TbtData: |
| 24 | + LOGGER.info("Starting to read TBT data") |
| 25 | + """ |
| 26 | + Reads turn-by-turn data from the ``MAD-NG`` **TFS** format file. |
| 27 | +
|
| 28 | + Args: |
| 29 | + file_path (str | Path): path to the turn-by-turn measurement file. |
| 30 | +
|
| 31 | + Returns: |
| 32 | + A ``TbTData`` object with the loaded data. |
| 33 | + """ |
| 34 | + # df = tfs.read(file_path) |
| 35 | + nturns = int(df.iloc[-1].loc["turn"]) |
| 36 | + npart = int(df.iloc[-1].loc["id"]) |
| 37 | + LOGGER.info(f"Number of turns: {nturns}, Number of particles: {npart}") |
| 38 | + |
| 39 | + # Get the unique BPMs and number of BPMs |
| 40 | + bpms = df["name"].unique() |
| 41 | + nbpms = len(bpms) |
| 42 | + |
| 43 | + # Set the index to the particle ID |
| 44 | + df.set_index(["id"], inplace=True) |
| 45 | + |
| 46 | + matrices = [] |
| 47 | + for particle_id in range(npart): |
| 48 | + LOGGER.info(f"Processing particle ID: {particle_id + 1}") |
| 49 | + # Filter the dataframe for the current particle and set index to the matrix dims |
| 50 | + subdf = df.loc[particle_id + 1] # Particle ID starts from 1 (not 0) |
| 51 | + print(len(subdf["name"]), nbpms, nturns) |
| 52 | + assert ( |
| 53 | + len(subdf["name"]) / nturns == nbpms |
| 54 | + ), "The number of BPMs is not consistent for all particles/turns. Simulation may have lost particles." |
| 55 | + subdf.set_index( |
| 56 | + ["eidx"], inplace=True |
| 57 | + ) # Must set index after getting unique BPMs |
| 58 | + |
| 59 | + # Create a dictionary of the TrackingData fields |
| 60 | + tracking_data_dict = { |
| 61 | + field: pd.DataFrame( |
| 62 | + index=bpms, |
| 63 | + data=subdf[field.lower()] |
| 64 | + .to_numpy() |
| 65 | + .reshape(nbpms, nturns, order="F"), # Number of BPMs x Number of turns |
| 66 | + ) |
| 67 | + for field in TransverseData.fieldnames() |
| 68 | + } |
| 69 | + |
| 70 | + # Append the TrackingData object to the matrices list |
| 71 | + matrices.append(TransverseData(**tracking_data_dict)) |
| 72 | + |
| 73 | + LOGGER.info("Finished reading TBT data") |
| 74 | + return TbtData(matrices=matrices, bunch_ids=list(range(npart)), nturns=nturns) |
0 commit comments