|
| 1 | +import subprocess |
| 2 | +import sys |
| 3 | +from pathlib import Path |
| 4 | + |
| 5 | +import pytest |
| 6 | +import tfs |
| 7 | + |
| 8 | +from tests.helpers import (CIRCUIT, EPS, IP, MAX_N, STRENGTH, VALUE, generate_errortable, |
| 9 | + generate_pseudo_model, get_some_magnet_names) |
| 10 | + |
| 11 | + |
| 12 | +def test_package_execution_no_arguments(): |
| 13 | + """ Tests if the package can be run as a module. |
| 14 | + Without arguments it should abort with an error message stating required arguments. """ |
| 15 | + # Run the package as a module using subprocess |
| 16 | + result = subprocess.run([sys.executable, "-m", "irnl_rdt_correction"], capture_output=True, text=True) |
| 17 | + |
| 18 | + # Check if the command executed with sysexit (exit code 2) |
| 19 | + assert result.returncode == 2 |
| 20 | + |
| 21 | + # Check for expected output |
| 22 | + expected_output = "error: the following arguments are required: --beams, --twiss" |
| 23 | + assert expected_output in result.stderr |
| 24 | + |
| 25 | + |
| 26 | +def test_package_execution_fix_v1_1_2(): |
| 27 | + """ Tests if the package can be run as a module. This failed in <1.1.2 with an import error. """ |
| 28 | + # Run the package as a module using subprocess |
| 29 | + result = subprocess.run([sys.executable, "-m", "irnl_rdt_correction"], capture_output=True, text=True) |
| 30 | + |
| 31 | + # Check if the command executed not with an import error (exit code 1) |
| 32 | + assert result.returncode != 1 |
| 33 | + |
| 34 | + # Check for a module not found error |
| 35 | + not_expected_output = "ModuleNotFoundError: No module named 'utilities'" |
| 36 | + assert not_expected_output not in result.stderr |
| 37 | + |
| 38 | + |
| 39 | +@pytest.mark.parametrize('accel', ('lhc', 'hllhc')) |
| 40 | +def test_basic_correction(tmp_path: Path, accel: str): |
| 41 | + """Tests the basic correction functionality and performs some sanity checks. |
| 42 | + Same as in tets_standard_correction, but less testcases and called from commandline. |
| 43 | +
|
| 44 | + Operates on a pseudo-model so that the corrector values are easily known. |
| 45 | + Sanity Checks: |
| 46 | + - all correctors found |
| 47 | + - correctors have the correct value (as set by errors or zero) |
| 48 | + - all corrector circuits present in madx-script |
| 49 | + """ |
| 50 | + # Parameters ----------------------------------------------------------- |
| 51 | + order = 4 |
| 52 | + orientation = "" |
| 53 | + |
| 54 | + correct_ips = (1, 3) |
| 55 | + error_value = 2 |
| 56 | + n_magnets = 4 |
| 57 | + n_ips = 4 |
| 58 | + |
| 59 | + n_correct_ips = len(correct_ips) |
| 60 | + n_sides = len("LR") |
| 61 | + n_orientation = len(["S", ""]) |
| 62 | + |
| 63 | + # Setup ---------------------------------------------------------------- |
| 64 | + twiss = generate_pseudo_model(accel=accel, n_ips=n_ips, n_magnets=n_magnets) |
| 65 | + errors = generate_errortable(index=get_some_magnet_names(n_ips=n_ips, n_magnets=n_magnets)) |
| 66 | + error_component = f"K{order-1}{orientation}L" |
| 67 | + errors[error_component] = error_value |
| 68 | + |
| 69 | + if order % 2: # order is odd -> sides have different sign in rdt |
| 70 | + left_hand_magnets = errors.index.str.match(r".*L\d$") |
| 71 | + errors.loc[left_hand_magnets, error_component] = errors.loc[left_hand_magnets, error_component] / 2 # so they don't fully compensate |
| 72 | + |
| 73 | + twiss_path = tmp_path / "twiss_input.tfs" |
| 74 | + errors_path = tmp_path / "errors_input.tfs" |
| 75 | + result_path = tmp_path / "correct" |
| 76 | + |
| 77 | + tfs.write(twiss_path, twiss, save_index="NAME") |
| 78 | + tfs.write(errors_path, errors, save_index="NAME") |
| 79 | + |
| 80 | + # Correction ----------------------------------------------------------- |
| 81 | + subprocess.run([sys.executable, "-m", |
| 82 | + "irnl_rdt_correction", |
| 83 | + "--accel", accel, |
| 84 | + "--twiss", twiss_path, |
| 85 | + "--errors", errors_path, |
| 86 | + "--beams", "1", |
| 87 | + "--output", result_path, |
| 88 | + "--feeddown", "0", |
| 89 | + "--ips", *[str(ip) for ip in correct_ips], |
| 90 | + "--ignore_missing_columns", |
| 91 | + "--iterations", "1", |
| 92 | + ], |
| 93 | + capture_output=True, text=True |
| 94 | + ) |
| 95 | + |
| 96 | + madx_corrections = result_path.with_suffix(".madx").read_text() |
| 97 | + df_corrections = tfs.read(result_path.with_suffix(".tfs")) |
| 98 | + |
| 99 | + # Testing -------------------------------------------------------------- |
| 100 | + # Same as in test_standard_correction - TODO: make function? |
| 101 | + # Check output data --- |
| 102 | + assert len(list(tmp_path.glob("correct.*"))) == 2 |
| 103 | + |
| 104 | + # Check all found correctors --- |
| 105 | + if accel == 'lhc': |
| 106 | + assert len(df_corrections.index) == ( |
| 107 | + n_orientation * n_sides * n_correct_ips * len("SO") + |
| 108 | + n_sides * n_correct_ips * len("T") |
| 109 | + ) |
| 110 | + |
| 111 | + if accel == 'hllhc': |
| 112 | + assert len(df_corrections.index) == n_orientation * n_sides * n_correct_ips * len("SODT") |
| 113 | + |
| 114 | + # All circuits in madx script --- |
| 115 | + for circuit in df_corrections[CIRCUIT]: |
| 116 | + assert circuit in madx_corrections |
| 117 | + |
| 118 | + # Check corrector values --- |
| 119 | + for test_order in range(3, MAX_N+1): |
| 120 | + for test_orientation in ("S", ""): |
| 121 | + for ip in correct_ips: |
| 122 | + mask = ( |
| 123 | + (df_corrections[STRENGTH] == f"K{test_order-1}{test_orientation}L") & |
| 124 | + (df_corrections[IP] == ip) |
| 125 | + ) |
| 126 | + if (test_order == order) and (test_orientation == orientation): |
| 127 | + if order % 2: |
| 128 | + corrector_strengths = sum(df_corrections.loc[mask, VALUE]) |
| 129 | + assert abs(corrector_strengths) < EPS # correctors should be equally distributed |
| 130 | + |
| 131 | + corrector_strengths = -sum(df_corrections.loc[mask, VALUE].abs()) |
| 132 | + # as beta cancels out (and is 1 anyway) |
| 133 | + error_strengths = n_magnets * error_value / 2 # account for partial compensation (from above) |
| 134 | + else: |
| 135 | + corrector_strengths = sum(df_corrections.loc[mask, VALUE]) |
| 136 | + assert all(abs(df_corrections.loc[mask, VALUE] - corrector_strengths / n_sides) < EPS) |
| 137 | + # as beta cancels out (and is 1 anyway) |
| 138 | + error_strengths = (n_sides * n_magnets * error_value) |
| 139 | + assert abs(corrector_strengths + error_strengths) < EPS # compensation of RDT |
| 140 | + else: |
| 141 | + assert all(df_corrections.loc[mask, VALUE] == 0.) |
0 commit comments