From 5acac3c4d3ad7e36aa09622d72369464d82abba4 Mon Sep 17 00:00:00 2001 From: skostogl Date: Mon, 16 Oct 2023 11:56:51 +0200 Subject: [PATCH 1/2] fma analysis and plotting --- xtrack/footprint.py | 153 +++++++++++++++++++++++++++++++++++++++++++- xtrack/line.py | 88 ++++++++++++++++++++++++- 2 files changed, 239 insertions(+), 2 deletions(-) diff --git a/xtrack/footprint.py b/xtrack/footprint.py index c0018e981..25d878a53 100644 --- a/xtrack/footprint.py +++ b/xtrack/footprint.py @@ -59,7 +59,8 @@ def __init__(self, nemitt_x=None, nemitt_y=None, n_turns=256, n_fft=2**18, mode='polar', r_range=None, theta_range=None, n_r=None, n_theta=None, x_norm_range=None, y_norm_range=None, n_x_norm=None, n_y_norm=None, keep_fft=False, keep_tracking_data=False, - auto_to_numpy=True,fft_chunk_size=200 + auto_to_numpy=True,fft_chunk_size=200, + n_turns_1 = None, n_turns_2 = None, hann = 3 ): assert nemitt_x is not None and nemitt_y is not None, ( @@ -74,6 +75,10 @@ def __init__(self, nemitt_x=None, nemitt_y=None, n_turns=256, n_fft=2**18, self.nemitt_x = nemitt_x self.nemitt_y = nemitt_y + + self.n_turns_1 = n_turns_1 + self.n_turns_2 = n_turns_2 + self.hann = hann assert mode in ['polar', 'uniform_action_grid'], ( 'mode must be either polar or uniform_action_grid') @@ -215,6 +220,131 @@ def _compute_footprint(self, line, freeze_longitudinal=False, print ('Done computing footprint.') + + def _compute_fma(self, line, freeze_longitudinal=False, + delta0=None, zeta0=None): + + if freeze_longitudinal is None: + # In future we could detect if the line has frozen longitudinal plane + freeze_longitudinal = False + + nplike_lib = line._context.nplike_lib + + particles = line.build_particles( + x_norm=self.x_norm_2d.flatten(), y_norm=self.y_norm_2d.flatten(), + nemitt_x=self.nemitt_x, nemitt_y=self.nemitt_y, + zeta=zeta0, delta=delta0, + freeze_longitudinal=freeze_longitudinal, + method={True: '4d', False: '6d'}[freeze_longitudinal] + ) + + print('Tracking particles for fma...') + line.track(particles, num_turns=self.n_turns, turn_by_turn_monitor=True, + freeze_longitudinal=freeze_longitudinal) + print('Done tracking.') + + ctx2np = line._context.nparray_from_context_array + assert np.all(ctx2np(particles.state == 1)), ( + 'Some particles were lost during tracking') + mon = line.record_last_track + mon.auto_to_numpy = False + + if isinstance(line._context, xo.ContextPyopencl): + raise NotImplementedError( + 'Footprint calculation with Pyopencl not supported yet. ' + 'Let us know if you need this feature.') + # Could be implemented using xobject fft + + x_noCO = mon.x - nplike_lib.atleast_2d(mon.x.mean(axis=1)).T + y_noCO = mon.y - nplike_lib.atleast_2d(mon.y.mean(axis=1)).T + + freq_axis = nplike_lib.fft.rfftfreq(self.n_fft) + + npart = nplike_lib.shape(x_noCO)[0] + self.qx1 = nplike_lib.zeros(npart,dtype=float) + self.qy1 = nplike_lib.zeros(npart,dtype=float) + self.qx2 = nplike_lib.zeros(npart,dtype=float) + self.qy2 = nplike_lib.zeros(npart,dtype=float) + self.qy2 = nplike_lib.zeros(npart,dtype=float) + self.diffusion = nplike_lib.zeros(npart,dtype=float) + + if self.keep_fft: + self.fft_x_1 = nplike_lib.zeros((npart,len(freq_axis)),dtype=complex) + self.fft_y_1 = nplike_lib.zeros((npart,len(freq_axis)),dtype=complex) + self.fft_x_2 = nplike_lib.zeros((npart,len(freq_axis)),dtype=complex) + self.fft_y_2 = nplike_lib.zeros((npart,len(freq_axis)),dtype=complex) + + # Hann window + from scipy.special import factorial + def hann_window(turns, hann): + T = np.linspace(0, turns, num=turns, endpoint=True)*2.0*np.pi - np.pi*turns + return ((2.0**hann*factorial(hann)**2)/float(factorial(2*hann))) * (1.0 + np.cos(T/turns))**hann + + # Compute in chunks + iStart = 0 + print('Computing FFT..') + while iStart < npart: + iEnd = iStart + self.fft_chunk_size + if iEnd > npart: + iEnd = npart + # Apply window to each interval + hann_1 = hann_window(self.n_turns_1[1]-self.n_turns_1[0], self.hann) + hann_1 = np.tile(hann_1, (iEnd-iStart,1) ) + hann_2 = hann_window(self.n_turns_2[1]-self.n_turns_2[0], self.hann) + hann_2 = np.tile(hann_2, (iEnd-iStart,1) ) + signal_x_1 = x_noCO[iStart:iEnd,self.n_turns_1[0]:self.n_turns_1[1]] * hann_1 + signal_y_1 = y_noCO[iStart:iEnd,self.n_turns_1[0]:self.n_turns_1[1]] * hann_1 + signal_x_2 = x_noCO[iStart:iEnd,self.n_turns_2[0]:self.n_turns_2[1]] * hann_2 + signal_y_2 = y_noCO[iStart:iEnd,self.n_turns_2[0]:self.n_turns_2[1]] * hann_2 + + fft_x_1 = nplike_lib.fft.rfft(signal_x_1, n=self.n_fft) + fft_y_1 = nplike_lib.fft.rfft(signal_y_1, n=self.n_fft) + fft_x_2 = nplike_lib.fft.rfft(signal_x_2, n=self.n_fft) + fft_y_2 = nplike_lib.fft.rfft(signal_y_2, n=self.n_fft) + if self.keep_fft: + self.fft_x_1[iStart:iEnd,:] = fft_x_1 + self.fft_y_1[iStart:iEnd,:] = fft_y_1 + self.fft_x_2[iStart:iEnd,:] = fft_x_2 + self.fft_y_2[iStart:iEnd,:] = fft_y_2 + qx1 = freq_axis[nplike_lib.argmax(nplike_lib.abs(fft_x_1), axis=1)] + qy1 = freq_axis[nplike_lib.argmax(nplike_lib.abs(fft_y_1), axis=1)] + qx2 = freq_axis[nplike_lib.argmax(nplike_lib.abs(fft_x_2), axis=1)] + qy2 = freq_axis[nplike_lib.argmax(nplike_lib.abs(fft_y_2), axis=1)] + diffusion = np.sqrt((qx1-qx2)**2 + (qy1-qy2)**2) + diffusion = np.log10([1e-60 if x == 0 else x for x in diffusion]) + + self.qx1[iStart:iEnd] = qx1 + self.qy1[iStart:iEnd] = qy1 + self.qx2[iStart:iEnd] = qx2 + self.qy2[iStart:iEnd] = qy2 + self.diffusion[iStart:iEnd] = diffusion + iStart += self.fft_chunk_size + + self.qx1 = nplike_lib.reshape(self.qx1, self.x_norm_2d.shape) + self.qy1 = nplike_lib.reshape(self.qy1, self.y_norm_2d.shape) + self.qx2 = nplike_lib.reshape(self.qx2, self.x_norm_2d.shape) + self.qy2 = nplike_lib.reshape(self.qy2, self.y_norm_2d.shape) + self.diffusion = nplike_lib.reshape(self.diffusion, self.x_norm_2d.shape) + + if self.auto_to_numpy: + ctx2np = line._context.nparray_from_context_array + self.qx1 = ctx2np(self.qx1) + self.qy1 = ctx2np(self.qy1) + self.qx2 = ctx2np(self.qx2) + self.qy2 = ctx2np(self.qy2) + self.diffusion = ctx2np(self.diffusion) + if self.keep_fft: + self.fft_x_1 = ctx2np(self.fft_x_1) + self.fft_y_1 = ctx2np(self.fft_y_1) + self.fft_x_2 = ctx2np(self.fft_x_2) + self.fft_y_2 = ctx2np(self.fft_y_2) + + if self.keep_tracking_data: + self.tracking_data = mon + + print ('Done computing fma.') + + def _compute_tune_shift(self,_context,J1_2d,J1_grid,J2_2d,J2_grid,q,coherent_tune,epsilon): nplike_lib = _context.nplike_lib ctx2np = _context.nparray_from_context_array @@ -380,3 +510,24 @@ def plot(self, ax=None, **kwargs): ax.set_xlabel(r'$q_x$') ax.set_ylabel(r'$q_y$') + + def plot_fma(self, ax=None, s=1, vmin=-9, vmax=-4, cmap='jet', **kwargs): + import matplotlib.pyplot as plt + + if ax is None: + ax = plt.gca() + + labels = [None] * self.qx2.shape[1] + + if 'label' in kwargs: + label_str = kwargs['label'] + kwargs.pop('label') + labels[0] = label_str + + scatter = ax.scatter(self.qx2, self.qy2, c=self.diffusion, label=labels, s=s, vmin=vmin, vmax=vmax, cmap=cmap, **kwargs) + + ax.set_xlabel(r'$q_x$') + ax.set_ylabel(r'$q_y$') + cbar = plt.colorbar(scatter, ax=ax) + cbar.set_label(r'$\rm \log_{10}\left({\sqrt{\Delta Q_x^2 + \Delta Q_y^2}}\right)$') + diff --git a/xtrack/line.py b/xtrack/line.py index ae70e0661..d5b13520c 100644 --- a/xtrack/line.py +++ b/xtrack/line.py @@ -1367,6 +1367,92 @@ def get_footprint(self, nemitt_x=None, nemitt_y=None, n_turns=256, n_fft=2**18, return fp + + def get_fma(self, nemitt_x=None, nemitt_y=None, n_turns_1=(0,1000), n_turns_2=(1000, 2000), hann=3, n_fft=2**18, + mode='polar', r_range=None, theta_range=None, n_r=None, n_theta=None, + x_norm_range=None, y_norm_range=None, n_x_norm=None, n_y_norm=None, + freeze_longitudinal=None, delta0=None, zeta0=None, + keep_fft=True, keep_tracking_data=False): + + ''' + Compute the FMA for a beam with given emittences using tracking. + + Parameters + ---------- + + nemitt_x : float + Normalized emittance in the x-plane. + nemitt_y : float + Normalized emittance in the y-plane. + n_turns_1 : tuple of ints + First turn and last turn for first fft interval. + n_turns_2 : tuple of ints + First turn and last turn for second fft interval. + hann: int + Order of the hann window. Default is 3, reduce for noisy signals. + n_fft : int + Number of points for FFT (tracking data is zero-padded to this length). + mode : str + Mode for computing footprint. Options are 'polar' and 'uniform_action_grid'. + In 'polar' mode, the footprint is computed on a polar grid with + r_range and theta_range specifying the range of r and theta values ( + polar coordinates in the x_norm, y_norm plane). + In 'uniform_action_grid' mode, the footprint is computed on a uniform + grid in the action space (Jx, Jy). + r_range : tuple of floats + Range of r values for footprint in polar mode. Default is (0.1, 6) sigmas. + theta_range : tuple of floats + Range of theta values for footprint in polar mode. Default is + (0.05, pi / 2 - 0.05) radians. + n_r : int + Number of r values for footprint in polar mode. Default is 10. + n_theta : int + Number of theta values for footprint in polar mode. Default is 10. + x_norm_range : tuple of floats + Range of x_norm values for footprint in `uniform action grid` mode. + Default is (0.1, 6) sigmas. + y_norm_range : tuple of floats + Range of y_norm values for footprint in `uniform action grid` mode. + Default is (0.1, 6) sigmas. + n_x_norm : int + Number of x_norm values for footprint in `uniform action grid` mode. + Default is 10. + n_y_norm : int + Number of y_norm values for footprint in `uniform action grid` mode. + Default is 10. + freeze_longitudinal : bool + If True, the longitudinal coordinates are frozen during the particles + matching and the tracking. + delta0: float + Initial value of the delta coordinate. + zeta0: float + Initial value of the zeta coordinate. + + Returns + ------- + fp : Footprint + Footprint object containing footprint data (fp.qx1, fp.qy1, fp.qx2, fp.qy2, fp.diffusion). + + ''' + + kwargs = locals() + kwargs.pop('self') + + freeze_longitudinal = kwargs.pop('freeze_longitudinal') + delta0 = kwargs.pop('delta0') + zeta0 = kwargs.pop('zeta0') + + fp = Footprint(**kwargs) + fp.n_turns = n_turns_2[-1] + fp.n_turns_1 = n_turns_1 + fp.n_turns_2 = n_turns_2 + fp.hann = hann + fp._compute_fma(self, + freeze_longitudinal=freeze_longitudinal, + delta0=delta0, zeta0=zeta0) + + return fp + def compute_one_turn_matrix_finite_differences( self, particle_on_co, steps_r_matrix=None, @@ -3601,4 +3687,4 @@ def __init__(self, line, fields): self._cache[ff] = LineAttrItem(name=name, index=index, line=line) def __getitem__(self, key): - return self._cache[key].get_full_array() \ No newline at end of file + return self._cache[key].get_full_array() From 9df40fbdde4d8f3c088d868f39e55053a3d22919 Mon Sep 17 00:00:00 2001 From: skostogl Date: Mon, 16 Oct 2023 17:26:06 +0200 Subject: [PATCH 2/2] added very simple test fma --- tests/test_fma.py | 78 +++++++++++++++++++++++++++++++++++++++++++++ xtrack/footprint.py | 4 +-- 2 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 tests/test_fma.py diff --git a/tests/test_fma.py b/tests/test_fma.py new file mode 100644 index 000000000..b6c7d15da --- /dev/null +++ b/tests/test_fma.py @@ -0,0 +1,78 @@ +import pathlib + +import numpy as np +import pytest + +import xtrack as xt +import xpart as xp +import xobjects as xo + +from xobjects.test_helpers import for_all_test_contexts + +test_data_folder = pathlib.Path(__file__).parent.joinpath('../test_data').absolute() + +@for_all_test_contexts +@pytest.mark.parametrize('freeze_longitudinal', [True]) +def test_footprint(test_context, freeze_longitudinal): + + if isinstance(test_context, xo.ContextPyopencl): + pytest.skip('Pyopencl not yet supported for footprint') + return + + nemitt_x = 1e-6 + nemitt_y = 1e-6 + + line = xt.Line.from_json(test_data_folder / + 'hllhc15_noerrors_nobb/line_w_knobs_and_particle.json') + line.particle_ref = xp.Particles(mass0=xp.PROTON_MASS_EV, p0c=7e12) + line.build_tracker(_context=test_context) + + if freeze_longitudinal: + kwargs = {'freeze_longitudinal': True} + for ee in line.elements: + if isinstance(ee, xt.Cavity): + ee.voltage = 0 + else: + kwargs = {} + + line.vars['i_oct_b1'] = 0 + fp0 = line.get_fma(nemitt_x=nemitt_x, nemitt_y=nemitt_y, + **kwargs) + + line.vars['i_oct_b1'] = 500 + fp1 = line.get_fma(nemitt_x=nemitt_x, nemitt_y=nemitt_y, + n_r=11, n_theta=7, r_range=[0.05, 7], + theta_range=[0.01, np.pi/2-0.01], + keep_tracking_data=True, + **kwargs) + + + assert hasattr(fp1, 'tracking_data') + + assert hasattr(fp1, 'theta_grid') + assert hasattr(fp1, 'r_grid') + + assert hasattr(fp1, 'qx1') + assert hasattr(fp1, 'qx2') + assert hasattr(fp1, 'qy1') + assert hasattr(fp1, 'qy2') + assert hasattr(fp1, 'diffusion') + + assert len(fp1.r_grid) == 11 + assert len(fp1.theta_grid) == 7 + + assert np.isclose(fp1.r_grid[0], 0.05, rtol=0, atol=1e-10) + assert np.isclose(fp1.r_grid[-1], 7, rtol=0, atol=1e-10) + + assert np.isclose(fp1.theta_grid[0], 0.01, rtol=0, atol=1e-10) + assert np.isclose(fp1.theta_grid[-1], np.pi/2 - 0.01, rtol=0, atol=1e-10) + + #i_theta = 0, i_r = 0 + assert np.isclose(fp1.x_norm_2d[0, 0], 0.05, rtol=0, atol=1e-3) + assert np.isclose(fp1.y_norm_2d[0, 0], 0, rtol=0, atol=1e-3) + assert np.isclose(fp1.qx1[0, 0], 0.31, rtol=0, atol=5e-5) + assert np.isclose(fp1.qy1[0, 0], 0.32, rtol=0, atol=5e-5) + assert np.isclose(fp1.qx2[0, 0], 0.31, rtol=0, atol=5e-5) + assert np.isclose(fp1.qy2[0, 0], 0.32, rtol=0, atol=5e-5) + + diff --git a/xtrack/footprint.py b/xtrack/footprint.py index 25d878a53..7b47c085e 100644 --- a/xtrack/footprint.py +++ b/xtrack/footprint.py @@ -528,6 +528,6 @@ def plot_fma(self, ax=None, s=1, vmin=-9, vmax=-4, cmap='jet', **kwargs): ax.set_xlabel(r'$q_x$') ax.set_ylabel(r'$q_y$') - cbar = plt.colorbar(scatter, ax=ax) - cbar.set_label(r'$\rm \log_{10}\left({\sqrt{\Delta Q_x^2 + \Delta Q_y^2}}\right)$') + #cbar = plt.colorbar(scatter, ax=ax) + #cbar.set_label(r'$\rm \log_{10}\left({\sqrt{\Delta Q_x^2 + \Delta Q_y^2}}\right)$')