diff --git a/pySC/correction/loco.py b/pySC/correction/loco.py index 52212a8..8ce94be 100644 --- a/pySC/correction/loco.py +++ b/pySC/correction/loco.py @@ -1,167 +1,311 @@ -import at -import numpy as np -import multiprocessing -from pySC.lattice_properties.response_model import SCgetModelRM, SCgetModelDispersion -from pySC.core.constants import SETTING_ADD, TRACK_ORB -from pySC.core.beam import bpm_reading -import matplotlib.pyplot as plt -from scipy.optimize import least_squares -from pySC.utils import logging_tools, sc_tools -LOGGER = logging_tools.get_logger(__name__) - - -def calculate_jacobian(SC, C_model, dkick, used_cor_ind, bpm_indexes, quads_ind, dk, trackMode=TRACK_ORB, - useIdealRing=True, skewness=False, order=1, method=SETTING_ADD, includeDispersion=False, rf_step=1E3, - cav_ords=None, full_jacobian=True): - pool = multiprocessing.Pool() - quad_args = [(quad_index, SC, C_model, dkick, used_cor_ind, bpm_indexes, dk, trackMode, useIdealRing, - skewness, order, method, includeDispersion, rf_step, cav_ords) for quad_index in quads_ind] - # results = [] - # for quad_arg in quad_args: - # results.append(generating_quads_response_matrices(quad_arg)) - results = pool.map(generating_quads_response_matrices, quad_args) - pool.close() - pool.join() - - results = [result / dk for result in results] - if full_jacobian: # Construct the complete Jacobian matrix for the LOCO - # assuming only linear scaling errors of BPMs and corrector magnets - n_correctors = len(np.concatenate(used_cor_ind)) - n_bpms = len(bpm_indexes) * 2 # in both planes - j_cor = np.zeros((n_correctors,) + C_model.shape) - for i in range(n_correctors): - j_cor[i, :, i] = C_model[:, i] # a single column of response matrix corresponding to a corrector - j_bpm = np.zeros((n_bpms,) + C_model.shape) - for i in range(n_bpms): - j_bpm[i, i, :] = C_model[i, :] # a single row of response matrix corresponding to a given plane of BPM - return np.concatenate((results, j_cor, j_bpm), axis=0) - return results - - -def generating_quads_response_matrices(args): - (quad_index, SC, C_model, correctors_kick, used_cor_indexes, used_bpm_indexes, dk, trackMode, useIdealRing, - skewness, order, method, includeDispersion, rf_step, cav_ords) = args - LOGGER.debug('generating response to quad of index', quad_index) - if not includeDispersion: - SC.set_magnet_setpoints(quad_index, dk, skewness, order, method) - C_measured = SCgetModelRM(SC, used_bpm_indexes, used_cor_indexes, dkick=correctors_kick, - useIdealRing=useIdealRing, - trackMode=trackMode) - SC.set_magnet_setpoints(quad_index, -dk, skewness, order, method) - return C_measured - C_model - - dispersion_model = SCgetModelDispersion(SC, used_bpm_indexes, CAVords=cav_ords) - SC.set_magnet_setpoints(quad_index, dk, skewness, order, method) - C_measured = SCgetModelRM(SC, used_bpm_indexes, used_cor_indexes, dkick=correctors_kick, useIdealRing=useIdealRing, - trackMode=trackMode) - dispersion_meas = SCgetModelDispersion(SC, used_bpm_indexes, CAVords=cav_ords, rfStep=rf_step) - SC.set_magnet_setpoints(quad_index, -dk, skewness, order, method) - return np.hstack((C_measured - C_model, (dispersion_meas - dispersion_model).reshape(-1, 1))) - - -def measure_closed_orbit_response_matrix(SC, bpm_ords, cm_ords, dkick=1e-5): - LOGGER.info('Calculating Measure response matrix') - n_turns = 1 - n_bpms = len(bpm_ords) - n_cms = len(cm_ords[0]) + len(cm_ords[1]) - response_matrix = np.full((2 * n_bpms * n_turns, n_cms), np.nan) - SC.INJ.trackMode = TRACK_ORB # TODO may modify SC (not a pure function)! - - closed_orbits0 = bpm_reading(SC, bpm_ords=bpm_ords)[0] - cnt = 0 - for n_dim in range(2): - for cm_ord in cm_ords[n_dim]: - SC.set_cm_setpoints(cm_ord, dkick, skewness=bool(n_dim), method=SETTING_ADD) - closed_orbits1 = bpm_reading(SC, bpm_ords=bpm_ords)[0] - SC.set_cm_setpoints(cm_ord, -dkick, skewness=bool(n_dim), method=SETTING_ADD) - response_matrix[:, cnt] = np.ravel((closed_orbits1 - closed_orbits0) / dkick) - cnt = cnt + 1 - return response_matrix - - -def loco_correction_lm(initial_guess0, orm_model, orm_measured, Jn, lengths, including_fit_parameters, bounds=(-np.inf, np.inf), weights=1, - verbose=2): - mask = _get_parameters_mask(including_fit_parameters, lengths) - result = least_squares(lambda delta_params: objective(delta_params, orm_measured - orm_model, Jn[mask, :, :], weights), - initial_guess0[mask], #bounds=bounds, - method="lm", - verbose=verbose) - return result.x - - -def loco_correction_ng(initial_guess0, orm_model, orm_measured, J, lengths, including_fit_parameters, s_cut, weights=1): - initial_guess = initial_guess0.copy() - mask = _get_parameters_mask(including_fit_parameters, lengths) - residuals = objective(initial_guess[mask], orm_measured - orm_model, J[mask, :, :], weights) - r = residuals.reshape(np.transpose(orm_model).shape) - t2 = np.zeros([len(initial_guess[mask]), 1]) - for i in range(len(initial_guess[mask])): - t2[i] = np.sum(np.dot(np.dot(J[i].T, weights), r.T)) - return get_inverse(J[mask, :, :], t2, s_cut, weights) - - -def objective(masked_params, orm_residuals, masked_jacobian, weights): - return np.dot(np.transpose(orm_residuals - np.einsum("ijk,i->jk", masked_jacobian, masked_params)), - np.sqrt(weights)).ravel() - - -def _get_parameters_mask(including_fit_parameters, lengths): - len_quads, len_corr, len_bpm = lengths - mask = np.zeros(len_quads + len_corr + len_bpm, dtype=bool) - mask[:len_quads] = 'quads' in including_fit_parameters - mask[len_quads:len_quads + len_corr] = 'cor' in including_fit_parameters - mask[len_quads + len_corr:] = 'bpm' in including_fit_parameters - return mask - - -def set_correction(SC, r, elem_ind, individuals=True, skewness=False, order=1, method=SETTING_ADD, dipole_compensation=True): - if individuals: - SC.set_magnet_setpoints(elem_ind, -r, skewness, order, method, dipole_compensation=dipole_compensation) - return SC - - for fam_num, quad_fam in enumerate(elem_ind): - SC.set_magnet_setpoints(quad_fam, -r[fam_num], skewness, order, method, dipole_compensation=dipole_compensation) - return SC - - -def model_beta_beat(ring, twiss, elements_indexes, plot=False): - _, _, twiss_error = at.get_optics(ring, elements_indexes) - s_pos = twiss_error.s_pos - bx = np.array(twiss_error.beta[:, 0] / twiss.beta[:, 0] - 1) - by = np.array(twiss_error.beta[:, 1] / twiss.beta[:, 1] - 1) - bx_rms = np.sqrt(np.mean(bx ** 2)) - by_rms = np.sqrt(np.mean(by ** 2)) - - if plot: - init_font = plt.rcParams["font.size"] - plt.rcParams.update({'font.size': 14}) - - fig, ax = plt.subplots(nrows=2, sharex="all") - betas = [bx, by] - letters = ("x", "y") - for i in range(2): - ax[i].plot(s_pos, betas[i]) - ax[i].set_xlabel("s_pos [m]") - ax[i].set_ylabel(rf'$\Delta\beta_{letters[i]}$ / $\beta_{letters[i]}$') - ax[i].ticklabel_format(axis='y', style='sci', scilimits=(0, 0)) - ax[i].grid(True, which='both', linestyle=':', color='gray') - - fig.show() - plt.rcParams.update({'font.size': init_font}) - - return bx_rms, by_rms - - -def select_equally_spaced_elements(total_elements, num_elements): - step = len(total_elements) // (num_elements - 1) - return total_elements[::step] - - -def get_inverse(jacobian, B, s_cut, weights, plot=False): - n_resp_mats = len(jacobian) - sum_corr = np.sum(jacobian, axis=2) # Sum over i and j for all planes - matrix = np.dot(np.dot(sum_corr, weights), sum_corr.T) - inv_matrix = sc_tools.pinv(matrix, num_removed=n_resp_mats - min(n_resp_mats, s_cut), plot=plot) - results = np.ravel(np.dot(inv_matrix, B)) - # e = np.ravel(np.dot(matrix, results)) - np.ravel(B) - return results +import at +import numpy as np +import multiprocessing +from pySC.lattice_properties.response_model import SCgetModelRM, SCgetModelDispersion +from pySC.core.constants import SETTING_ADD, TRACK_ORB +from pySC.core.beam import bpm_reading +import matplotlib.pyplot as plt +from scipy.optimize import least_squares +from pySC.utils import logging_tools, sc_tools, at_wrapper +from pySC.lattice_properties.response_model import SCgetModelRING, orbpass + +LOGGER = logging_tools.get_logger(__name__) + + +def calculate_jacobian(SC, C_model, dkick, used_cor_ind, bpm_indexes, quads_ind, dk, trackMode=TRACK_ORB, + useIdealRing=True, skewness=False, order=1, method=SETTING_ADD, includeDispersion=False, + rf_step=1E3, + cav_ords=None, full_jacobian=True): + pool = multiprocessing.Pool() + quad_args = [(quad_index, SC, C_model, dkick, used_cor_ind, bpm_indexes, dk, trackMode, useIdealRing, + skewness, order, method, includeDispersion, rf_step, cav_ords) for quad_index in quads_ind] + # results = [] + # for quad_arg in quad_args: + # results.append(generating_quads_response_matrices(quad_arg)) + results = pool.map(generating_quads_response_matrices, quad_args) + pool.close() + pool.join() + + results = [result / dk for result in results] + if full_jacobian: # Construct the complete Jacobian matrix for the LOCO + # assuming only linear scaling errors of BPMs and corrector magnets + n_correctors = len(np.concatenate(used_cor_ind)) + n_bpms = len(bpm_indexes) * 2 # in both planes + j_cor = np.zeros((n_correctors,) + C_model.shape) + for i in range(n_correctors): + j_cor[i, :, i] = C_model[:, i] # a single column of response matrix corresponding to a corrector + j_bpm = np.zeros((n_bpms,) + C_model.shape) + for i in range(n_bpms): + j_bpm[i, i, :] = C_model[i, :] # a single row of response matrix corresponding to a given plane of BPM + return np.concatenate((results, j_cor, j_bpm), axis=0) + return results + + +def generating_quads_response_matrices(args): + (quad_index, SC, C_model, correctors_kick, used_cor_indexes, used_bpm_indexes, dk, trackMode, useIdealRing, + skewness, order, method, includeDispersion, rf_step, cav_ords) = args + LOGGER.debug('generating response to quad of index', quad_index) + if not includeDispersion: + SC.set_magnet_setpoints(quad_index, dk, skewness, order, method) + C_measured = SCgetModelRM(SC, used_bpm_indexes, used_cor_indexes, dkick=correctors_kick, + useIdealRing=useIdealRing, + trackMode=trackMode) + SC.set_magnet_setpoints(quad_index, -dk, skewness, order, method) + return C_measured - C_model + + # dispersion_model = SCgetModelDispersion(SC, used_bpm_indexes, CAVords=cav_ords, rfStep=rf_step) + _, _, twiss = at.get_optics(SC.IDEALRING, used_bpm_indexes) ## ADD dispersion to the ORMs from AT get_optics + dx = twiss.dispersion[:, 0] + dy = twiss.dispersion[:, 2] + dispersion_model = np.column_stack((dx, dy)) + SC.set_magnet_setpoints(quad_index, dk, skewness, order, method) + C_measured = SCgetModelRM(SC, used_bpm_indexes, used_cor_indexes, dkick=correctors_kick, useIdealRing=useIdealRing, + trackMode=trackMode) + # dispersion_meas = SCgetModelDispersion(SC, used_bpm_indexes, CAVords=cav_ords, rfStep=rf_step, useIdealRing=False) + _, _, twiss = at.get_optics(SC.RING, used_bpm_indexes) + dx = twiss.dispersion[:, 0] + dy = twiss.dispersion[:, 2] + dispersion_meas = np.column_stack((dx, dy)) + C_measured = np.hstack((C_measured, ((dispersion_meas - dispersion_model) / correctors_kick).reshape(-1, 1))) + SC.set_magnet_setpoints(quad_index, -dk, skewness, order, method) + return C_measured - C_model + + +def measure_closed_orbit_response_matrix(SC, bpm_ords, cm_ords, dkick=1e-5, includeDispersion=False): + LOGGER.info('Calculating Measure response matrix') + n_turns = 1 + n_bpms = len(bpm_ords) + n_cms = len(cm_ords[0]) + len(cm_ords[1]) + response_matrix = np.full((2 * n_bpms * n_turns, n_cms), np.nan) + SC.INJ.trackMode = TRACK_ORB # TODO may modify SC (not a pure function)! + + closed_orbits0 = bpm_reading(SC, bpm_ords=bpm_ords)[0] + cnt = 0 + for n_dim in range(2): + for cm_ord in cm_ords[n_dim]: + SC.set_cm_setpoints(cm_ord, dkick, skewness=bool(n_dim), method=SETTING_ADD) + closed_orbits1 = bpm_reading(SC, bpm_ords=bpm_ords)[0] + SC.set_cm_setpoints(cm_ord, -dkick, skewness=bool(n_dim), method=SETTING_ADD) + response_matrix[:, cnt] = np.ravel((closed_orbits1 - closed_orbits0) / dkick) + cnt = cnt + 1 + if includeDispersion == True: + _, _, twiss = at.get_optics(SC.RING, bpm_ords) + dx = twiss.dispersion[:, 0] + dy = twiss.dispersion[:, 2] + dispersion_meas = np.column_stack((dx, dy)) + return np.hstack( + (response_matrix, ((dispersion_meas) / dkick).reshape(-1, 1))) + return response_matrix + + +def loco_correction_lm(initial_guess0, orm_model, orm_measured, Jn, lengths, including_fit_parameters, + bounds=(-np.inf, np.inf), weights=1, + verbose=2): + mask = _get_parameters_mask2(including_fit_parameters, lengths) + result = least_squares( + lambda delta_params: objective(delta_params, orm_measured - orm_model, Jn[mask, :, :], weights), + initial_guess0[mask], # bounds=bounds, + method="lm", + verbose=verbose) + return result.x + + +def loco_correction_ng(initial_guess0, orm_model, orm_measured, J, lengths, including_fit_parameters, s_cut, weights=1, + includeDispersion=False): + initial_guess = initial_guess0.copy() + mask = _get_parameters_mask2(including_fit_parameters, lengths) + residuals = objective(initial_guess[mask], orm_measured - orm_model, J[mask, :, :], weights) + r = residuals.reshape(np.transpose(orm_model).shape) + t2 = np.zeros([len(initial_guess[mask]), 1]) + for i in range(len(initial_guess[mask])): + t2[i] = np.sum(np.dot(np.dot(J[i].T, weights), r.T)) + return get_inverse(J[mask, :, :], t2, s_cut, weights) + + +def objective(masked_params, orm_residuals, masked_jacobian, weights): + return np.dot(np.transpose(orm_residuals - np.einsum("ijk,i->jk", masked_jacobian, masked_params)), + np.sqrt(weights)).ravel() + + +def _get_parameters_mask(including_fit_parameters, lengths): + len_quads, len_corr, len_bpm = lengths + mask = np.zeros(len_quads + len_corr + len_bpm, dtype=bool) + mask[:len_quads] = 'quads' in including_fit_parameters + mask[len_quads:len_quads + len_corr] = 'cor' in including_fit_parameters + mask[len_quads + len_corr:] = 'bpm' in including_fit_parameters + return mask + + +def _get_parameters_mask2(including_fit_parameters, lengths): + mask = np.zeros(sum(lengths), dtype=bool) + current_index = 0 + for param, length in zip(including_fit_parameters, lengths): + mask[current_index:current_index + length] = True + current_index += length + return mask + + +def set_correction(SC, r, elem_ind, individuals=True, skewness=False, order=1, method=SETTING_ADD, + dipole_compensation=True): + if individuals: + SC.set_magnet_setpoints(elem_ind, -r, skewness, order, method, dipole_compensation=dipole_compensation) + return SC + + for fam_num, quad_fam in enumerate(elem_ind): + SC.set_magnet_setpoints(quad_fam, -r[fam_num], skewness, order, method, dipole_compensation=dipole_compensation) + return SC + + +def model_beta_beat(ring, twiss, elements_indexes, plot=False): + _, _, twiss_error = at.get_optics(ring, elements_indexes) + s_pos = twiss_error.s_pos + bx = np.array(twiss_error.beta[:, 0] / twiss.beta[:, 0] - 1) + by = np.array(twiss_error.beta[:, 1] / twiss.beta[:, 1] - 1) + bx_rms = np.sqrt(np.mean(bx ** 2)) + by_rms = np.sqrt(np.mean(by ** 2)) + + if plot: + init_font = plt.rcParams["font.size"] + plt.rcParams.update({'font.size': 14}) + + fig, ax = plt.subplots(nrows=2, sharex="all") + betas = [bx, by] + letters = ("x", "y") + for i in range(2): + ax[i].plot(s_pos, betas[i]) + ax[i].set_xlabel("s_pos [m]") + ax[i].set_ylabel(rf'$\Delta\beta_{letters[i]}$ / $\beta_{letters[i]}$') + ax[i].ticklabel_format(axis='y', style='sci', scilimits=(0, 0)) + ax[i].grid(True, which='both', linestyle=':', color='gray') + + fig.show() + plt.rcParams.update({'font.size': init_font}) + + return bx_rms, by_rms + + +def select_equally_spaced_elements(total_elements, num_elements): + step = len(total_elements) // (num_elements - 1) + return total_elements[::step] + + +def get_inverse(jacobian, B, s_cut, weights, plot=False): + n_resp_mats = len(jacobian) + sum_corr = np.sum(jacobian, axis=2) # Sum over i and j for all planes + matrix = np.dot(np.dot(sum_corr, weights), sum_corr.T) + inv_matrix = sc_tools.pinv(matrix, num_removed=n_resp_mats - min(n_resp_mats, s_cut), plot=plot) + results = np.ravel(np.dot(inv_matrix, B)) + # e = np.ravel(np.dot(matrix, results)) - np.ravel(B) + return results + + +''' +The code below is for optics calculations (based on AT get_optics) and generates plots for analyzing the lattice. + Example usage: + + [-, -, twiss] = at.get_optics(SC.IDEALRING, SC.ORD.BPM) + analyze_ring(SC, twiss, SC.ORD.BPM, useIdealRing=False, makeplot=False) +''' + + +def analyze_ring(SC, twiss, bpm_indices, useIdealRing=True, makeplot=False): + ring = SC.IDEALRING.deepcopy() if useIdealRing else SC.RING # SCgetModelRING(SC) + rmsx, rmsy = rms_orbits(ring, bpm_indices) + bx_rms_err, by_rms_err = getBetaBeat(ring, twiss, bpm_indices) + dx_rms_err, dy_rms_err = getDispersionErr(ring, twiss, bpm_indices) + + print(f"RMS horizontal orbit: {rmsx * 1.e6:.2f} µm, RMS vertical orbit: {rmsy * 1.e6:.2f} µm") + print(f"RMS horizontal beta beating: {bx_rms_err * 100:.2f}%, RMS vertical beta beating: {by_rms_err * 100:.2f}%") + print( + f"RMS relative horizontal dispersion: {dx_rms_err:.4f} mm, RMS relative vertical dispersion: {dy_rms_err:.4f} mm") + print(f"Tune values: {at.get_tune(ring, get_integer=True)}, Chromaticity values: {at.get_chrom(ring)}") + + if makeplot: + plot_orbits(ring, bpm_indices) + plot_beta_beat(ring, twiss, bpm_indices) + plot_dispersion_err(ring, twiss, bpm_indices) + + +def calculate_rms(data): + return np.sqrt(np.mean(data ** 2)) + + +def plot_data(s_pos, data, xlabel, ylabel, title): + plt.rc('font', size=13) + fig, ax = plt.subplots(figsize=(6, 3)) + ax.plot(s_pos, data) + ax.set_xlabel(xlabel, fontsize=14) + ax.set_ylabel(ylabel, fontsize=14) + ax.ticklabel_format(axis='y', style='sci', scilimits=(0, 0)) + ax.grid(True, which='both', linestyle=':', color='gray') + plt.title(title) + plt.show() + + +def rms_orbits(ring, elements_indexes, trackMode='ORB', Z0=np.zeros(6)): + track_methods = dict(TBT=at_wrapper.atpass, ORB=orbpass) + if trackMode == 'ORB': + nTurns = 1 + trackmethod = track_methods['ORB'] + Ta = trackmethod(ring, Z0=Z0, nTurns=nTurns, REFPTS=elements_indexes) + + closed_orbitx = np.ravel(np.transpose(Ta[0, :, :, :], axes=(2, 1, 0))) + closed_orbity = np.ravel(np.transpose(Ta[2, :, :, :], axes=(2, 1, 0))) + + rmsx = calculate_rms(closed_orbitx) + rmsy = calculate_rms(closed_orbity) + + return rmsx, rmsy + + +def getBetaBeat(ring, twiss, elements_indexes): + _, _, twiss_error = at.get_optics(ring, elements_indexes) + bx = (twiss_error.beta[:, 0] - twiss.beta[:, 0]) / twiss.beta[:, 0] + by = (twiss_error.beta[:, 1] - twiss.beta[:, 1]) / twiss.beta[:, 1] + + bx_rms = calculate_rms(bx) + by_rms = calculate_rms(by) + + return bx_rms, by_rms + + +def getDispersionErr(ring, twiss, elements_indexes): + _, _, twiss_error = at.get_optics(ring, elements_indexes) + dx = twiss_error.dispersion[:, 0] - twiss.dispersion[:, 0] + dy = twiss_error.dispersion[:, 2] - twiss.dispersion[:, 2] + + dx_rms = calculate_rms(dx) + dy_rms = calculate_rms(dy) + + return dx_rms, dy_rms + + +def plot_orbits(ring, elements_indexes, trackMode='ORB', Z0=np.zeros(6)): + _, _, twiss = at.get_optics(ring, elements_indexes) + track_methods = dict(TBT=at_wrapper.atpass, ORB=orbpass) + if trackMode == 'ORB': + nTurns = 1 + trackmethod = track_methods['ORB'] + Ta = trackmethod(ring, Z0=Z0, nTurns=nTurns, REFPTS=elements_indexes) + closed_orbitx = np.ravel(np.transpose(Ta[0, :, :, :], axes=(2, 1, 0))) / 1.e-06 + closed_orbity = np.ravel(np.transpose(Ta[2, :, :, :], axes=(2, 1, 0))) / 1.e-06 + plot_data(twiss.s_pos, closed_orbitx, "s_pos [m]", r"closed_orbit x [$\mu$m]", "Horizontal closed orbit") + plot_data(twiss.s_pos, closed_orbity, "s_pos [m]", r"closed_orbit y [$\mu$m]", "Vertical closed orbit") + + +def plot_beta_beat(ring, twiss, elements_indexes): + _, _, twiss_error = at.get_optics(ring, elements_indexes) + bx = (twiss_error.beta[:, 0] - twiss.beta[:, 0]) / twiss.beta[:, 0] + by = (twiss_error.beta[:, 1] - twiss.beta[:, 1]) / twiss.beta[:, 1] + + plot_data(twiss_error.s_pos, bx, "s_pos [m]", r'$\Delta \beta_x / \beta_x$', "Horizontal beta beating") + plot_data(twiss_error.s_pos, by, "s_pos [m]", r'$\Delta \beta_y / \beta_y$', "Vertical beta beating") + + +def plot_dispersion_err(ring, twiss, elements_indexes): + _, _, twiss_error = at.get_optics(ring, elements_indexes) + dx = twiss_error.dispersion[:, 0] - twiss.dispersion[:, 0] + dy = twiss_error.dispersion[:, 2] - twiss.dispersion[:, 2] + + plot_data(twiss_error.s_pos, dx, "s_pos [m]", r'$\Delta \eta_x$ [m]', "Horizontal dispersion error") + plot_data(twiss_error.s_pos, dy, "s_pos [m]", r'$\Delta \eta_y$ [m]', "Vertical dispersion error") diff --git a/pySC/example.py b/pySC/example.py index 2cdcd4d..6cff613 100644 --- a/pySC/example.py +++ b/pySC/example.py @@ -8,8 +8,7 @@ from pySC.core.beam import bpm_reading, beam_transmission from pySC.correction.tune import tune_scan from pySC.lattice_properties.response_model import SCgetModelRM, SCgetModelDispersion -from pySC.correction.loco_wrapper import (loco_model, loco_fit_parameters, apply_lattice_correction, loco_measurement, - loco_bpm_structure, loco_cm_structure) +from pySC.correction import loco from pySC.plotting.plot_phase_space import plot_phase_space from pySC.plotting.plot_support import plot_support from pySC.plotting.plot_lattice import plot_lattice @@ -184,26 +183,177 @@ def _marker(name): SC, _, _, _ = tune_scan(SC, np.vstack((sc_tools.ords_from_regex(SC.RING, 'QF'), sc_tools.ords_from_regex(SC.RING, 'QD'))), np.outer(np.ones(2), 1 + np.linspace(-0.01, 0.01, 51)), do_plot=False, nParticles=100, nTurns=200) - CMstep = 1E-4 # [rad] # TODO later in the structure it is in mrad, ??? - RFstep = 1E3 # [Hz] - ring_data, loco_flags, init = loco_model(SC, Dispersion=True, HorizontalDispersionWeight=.1E2, - VerticalDispersionWeight= .1E2) - bpm_data = loco_bpm_structure(SC, FitGains=True) - cm_data = loco_cm_structure(SC, CMstep, FitKicks=True) - loco_meas_data = loco_measurement(SC, CMstep, RFstep, SC.ORD.BPM, SC.ORD.CM) - fit_parameters = loco_fit_parameters(SC, init.SC.RING, ring_data, RFstep, - [sc_tools.ords_from_regex(SC.RING, 'QF'), False, 'individual', 1E-3], - # {Ords, normal/skew, ind/fam, deltaK} - [sc_tools.ords_from_regex(SC.RING, 'QD'), False, 'individual', 1E-4]) - - for n in range(6): - _, bpm_data, cm_data, fit_parameters, loco_flags, ring_data = at_wrapper.atloco(loco_meas_data, bpm_data, cm_data, - fit_parameters, loco_flags, ring_data) - SC = apply_lattice_correction(SC, fit_parameters) - SC = orbit_trajectory.correct(SC, MCO, alpha=50, target=0, maxsteps=30) - if n == 3: - loco_flags.Coupling = True - fit_parameters = loco_fit_parameters(SC, init.SC.RING, ring_data, RFstep, - [sc_tools.ords_from_regex(SC.RING, 'QF'), False, 'individual', 1E-3], - [sc_tools.ords_from_regex(SC.RING, 'QD'), False, 'individual', 1E-4], - [SC.ORD.SkewQuad, True, 'individual', 1E-3]) + # LOCO + + used_correctors1 = loco.select_equally_spaced_elements(SC.ORD.CM[0], 20) + used_correctors2 = loco.select_equally_spaced_elements(SC.ORD.CM[1], 20) + used_cor_ords = [used_correctors1, used_correctors2] + used_bpms_ords = loco.select_equally_spaced_elements(SC.ORD.BPM, len(SC.ORD.BPM)) + cav_ords = sc_tools.ords_from_regex(SC.RING, 'RFC') + quads_ords = [sc_tools.ords_from_regex(SC.RING, 'QF'), sc_tools.ords_from_regex(SC.RING, 'QD')] + sext_ords = [sc_tools.ords_from_regex(SC.RING, 'SF'), sc_tools.ords_from_regex(SC.RING, 'SD')] + + CMstep = np.array([1.e-4]) # correctors change [rad] + dk = 1.e-4 # quads change + RFstep = 1e3 + + + orbit_response_matrix_model = SCgetModelRM(SC, used_bpms_ords, used_cor_ords, trackMode='ORB', useIdealRing=True, dkick=CMstep) + _, _, twiss = at.get_optics(SC.IDEALRING, SC.ORD.BPM) + dx = twiss.dispersion[:, 0] + dy = twiss.dispersion[:, 2] + dispersion_meas = np.column_stack((dx, dy)) + orbit_response_matrix_model2 = np.hstack( + (orbit_response_matrix_model, ((dispersion_meas) / CMstep).reshape(-1, 1))) + print('Optics parameters before LOCO') + loco.analyze_ring(SC, twiss, SC.ORD.BPM, useIdealRing=False, makeplot=False) + + Jn = loco.calculate_jacobian(SC, orbit_response_matrix_model, CMstep, used_cor_ords, used_bpms_ords, np.concatenate(quads_ords), dk, + trackMode='ORB', useIdealRing=False, skewness=False, order=1, method='add', + includeDispersion=False, rf_step=RFstep, cav_ords=cav_ords, full_jacobian=True) + + Jn2 = loco.calculate_jacobian(SC, orbit_response_matrix_model2, CMstep, used_cor_ords, used_bpms_ords, np.concatenate(quads_ords), dk, + trackMode='ORB', useIdealRing=False, skewness=False, order=1, method='add', + includeDispersion=True, rf_step=RFstep, cav_ords=cav_ords, full_jacobian=False) + + Js = loco.calculate_jacobian(SC, orbit_response_matrix_model2, CMstep, used_cor_ords, used_bpms_ords, np.concatenate(sext_ords), 1.e-3, + trackMode='ORB', useIdealRing=False, skewness=True, order=1, method='add', + includeDispersion=True, rf_step=RFstep, cav_ords=cav_ords, full_jacobian=True) + + Jc = np.concatenate((Jn2, Js), axis=0) + + + + #Jn = np.transpose(Jn, (0, 2, 1)) + #weights = 1 + weights = np.eye(len(used_bpms_ords) * 2) + tmp = np.sum(Jn, axis=2) + A = tmp @ weights @ tmp.T + u, s, v = np.linalg.svd(A, full_matrices=True) + import matplotlib.pyplot as plt + plt.plot(np.log(s), 'd--') + plt.title('singular value') + plt.xlabel('singular values') + plt.ylabel('$\log(\sigma_i)$') + plt.show() + + n_singular_values = 80 + + #Jt = loco_test.get_inverse(Jn, n_singular_values, weights) + + _, _, twiss_err = at.get_optics(SC.RING, SC.ORD.BPM) + bx_rms_err, by_rms_err = loco.model_beta_beat(SC.RING, twiss, SC.ORD.BPM, plot=False) + info_tab = 14 * " " + LOGGER.info("RMS Beta-beating before LOCO:\n" + f"{info_tab}{bx_rms_err * 100:04.2f}% horizontal\n{info_tab}{by_rms_err * 100:04.2f}% vertical ") + n_iter = 2 + # beta beating iterations + for x in range(n_iter): # optics correction using QF and QD + LOGGER.info(f'LOCO iteration {x}') + orbit_response_matrix_measured = loco.measure_closed_orbit_response_matrix(SC, used_bpms_ords, used_cor_ords, CMstep, includeDispersion=False) + n_quads, n_corrs, n_bpms = len(np.concatenate(quads_ords)), len(np.concatenate(used_cor_ords)), len(used_bpms_ords) * 2 + bx_rms_err, by_rms_err = loco.model_beta_beat(SC.RING, twiss, SC.ORD.BPM, plot=False) + dx_rms_err, dy_rms_err = loco.getDispersionErr(SC.RING, twiss, SC.ORD.BPM) + total_length = n_bpms + n_corrs + n_quads + lengths = [n_quads, n_corrs, n_bpms] + including_fit_parameters = ['quads', 'cor', 'bpm'] + initial_guess = np.zeros(total_length) + + initial_guess[:lengths[0]] = 1e-6 + initial_guess[lengths[0]:lengths[0] + lengths[1]] = 1e-6 + initial_guess[lengths[0] + lengths[1]:] = 1e-6 + + + # method lm (least squares) + #fit_parameters = loco.loco_correction_lm(initial_guess, (orbit_response_matrix_model), + # (orbit_response_matrix_measured), np.array(Jn), lengths, + # including_fit_parameters, bounds=(-0.03, 0.03), weights=weights, verbose=2) + # method ng + fit_parameters = loco.loco_correction_ng(initial_guess, orbit_response_matrix_model, + orbit_response_matrix_measured, np.array(Jn), lengths, + including_fit_parameters, n_singular_values, weights=weights) + + dg = fit_parameters[:lengths[0]] if len(fit_parameters) > n_quads else fit_parameters + SC = loco.set_correction(SC, dg, np.concatenate(quads_ords)) + bx_rms_cor, by_rms_cor = loco.model_beta_beat(SC.RING, twiss, SC.ORD.BPM, plot=False) + dx_rms_cor, dy_rms_cor = loco.getDispersionErr(SC.RING, twiss, SC.ORD.BPM) + LOGGER.info(f"RMS Beta-beating after {x + 1} LOCO iterations:\n" + f"{info_tab}{bx_rms_cor * 100:04.2f}% horizontal\n{info_tab}{by_rms_cor * 100:04.2f}% vertical ") + LOGGER.info(f"Correction reduction: \n" + f" beta_x: {(1 - bx_rms_cor / bx_rms_err) * 100:.2f}%\n" + f" beta_y: {(1 - by_rms_cor / by_rms_err) * 100:.2f}%\n") + LOGGER.info(f"RMS dispersion after {x + 1} LOCO iterations:\n" + f"{info_tab}{dx_rms_cor * 100:04.2f}% horizontal\n{info_tab}{dy_rms_cor * 100:04.2f}% vertical ") + LOGGER.info(f"Correction reduction: \n" + f" dispersion_x: {(1 - dx_rms_cor / dx_rms_err) * 100:.2f}%\n" + f" dispersion_y: {(1 - dy_rms_cor / dy_rms_err) * 100:.2f}%\n") + print('Optics parameters after LOCO') + loco.analyze_ring(SC, twiss, SC.ORD.BPM, useIdealRing=False, makeplot=False) + + # dispersion correction iterations + weights = np.eye(len(used_bpms_ords) * 2) + tmp = np.sum(Jc, axis=2) + A = tmp @ weights @ tmp.T + u, s, v = np.linalg.svd(A, full_matrices=True) + import matplotlib.pyplot as plt + + plt.plot(np.log(s), 'd--') + plt.title('singular value') + plt.xlabel('singular values') + plt.ylabel('$\log(\sigma_i)$') + plt.show() + + n_singular_values = 75 + + n_iter = 1 + + for x in range(n_iter): # optics correction using normal and skew quads + LOGGER.info(f'LOCO iteration {x}') + print('Optics parameters after LOCO') + loco.analyze_ring(SC, twiss, SC.ORD.BPM, useIdealRing=False, makeplot=False) + + orbit_response_matrix_measured = loco.measure_closed_orbit_response_matrix(SC, used_bpms_ords, used_cor_ords, CMstep, includeDispersion=True) + + n_quads, n_corrs, n_bpms = len(np.concatenate(quads_ords)) + len(np.concatenate(sext_ords)), len(np.concatenate(used_cor_ords)), len(used_bpms_ords) * 2 + bx_rms_err, by_rms_err = loco.model_beta_beat(SC.RING, twiss, SC.ORD.BPM, plot=False) + dx_rms_err, dy_rms_err = loco.getDispersionErr(SC.RING, twiss, SC.ORD.BPM) + + total_length = n_bpms + n_corrs + n_quads + lengths = [n_quads, n_corrs, n_bpms] + including_fit_parameters = ['quads', 'cor', 'bpm'] + initial_guess = np.zeros(total_length) + + initial_guess[:lengths[0]] = 1e-6 + initial_guess[lengths[0]:lengths[0] + lengths[1]] = 1e-6 + initial_guess[lengths[0] + lengths[1]:] = 1e-6 + + # method lm (least squares) + #fit_parameters = loco.loco_correction_lm(initial_guess, (orbit_response_matrix_model2), + # (orbit_response_matrix_measured), Jc, lengths, + # including_fit_parameters, bounds=(-0.03, 0.03), weights=weights, verbose=2) + + # method ng + fit_parameters = loco.loco_correction_ng(initial_guess, orbit_response_matrix_model2, + orbit_response_matrix_measured, Jc, lengths, + including_fit_parameters, n_singular_values, weights=weights) + + dgn = fit_parameters[:len(np.concatenate(quads_ords))] #if len(fit_parameters) > n_quads else fit_parameters + dgs = fit_parameters[len(np.concatenate(quads_ords)):len(np.concatenate(sext_ords))+len(np.concatenate(quads_ords))] #if len(fit_parameters) > n_quads else fit_parameters + + SC = loco.set_correction(SC, dgn, np.concatenate(quads_ords)) + SC = loco.set_correction(SC, dgs, np.concatenate(sext_ords), skewness=True) + + bx_rms_cor, by_rms_cor = loco.model_beta_beat(SC.RING, twiss, SC.ORD.BPM, plot=False) + dx_rms_cor, dy_rms_cor = loco.getDispersionErr(SC.RING, twiss, SC.ORD.BPM) + LOGGER.info(f"RMS Beta-beating after {x + 1} LOCO iterations:\n" + f"{info_tab}{bx_rms_cor * 100:04.2f}% horizontal\n{info_tab}{by_rms_cor * 100:04.2f}% vertical ") + LOGGER.info(f"Correction reduction: \n" + f" beta_x: {(1 - bx_rms_cor / bx_rms_err) * 100:.2f}%\n" + f" beta_y: {(1 - by_rms_cor / by_rms_err) * 100:.2f}%\n") + LOGGER.info(f"RMS dispersion after {x + 1} LOCO iterations:\n" + f"{info_tab}{dx_rms_cor * 100:04.2f}% horizontal\n{info_tab}{dy_rms_cor * 100:04.2f}% vertical ") + LOGGER.info(f"Correction reduction: \n" + f" dispersion_x: {(1 - dx_rms_cor / dx_rms_err) * 100:.2f}%\n" + f" dispersion_y: {(1 - dy_rms_cor / dy_rms_err) * 100:.2f}%\n") + print('Optics parameters after LOCO') + loco.analyze_ring(SC, twiss, SC.ORD.BPM, useIdealRing=False, makeplot=False)