Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions tests/test_fma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import pathlib

import numpy as np
import pytest

import xtrack as xt
import xpart as xp
import xobjects as xo

from xobjects.test_helpers import for_all_test_contexts

test_data_folder = pathlib.Path(__file__).parent.joinpath('../test_data').absolute()

@for_all_test_contexts
@pytest.mark.parametrize('freeze_longitudinal', [True])
def test_footprint(test_context, freeze_longitudinal):

if isinstance(test_context, xo.ContextPyopencl):
pytest.skip('Pyopencl not yet supported for footprint')
return

nemitt_x = 1e-6
nemitt_y = 1e-6

line = xt.Line.from_json(test_data_folder /
'hllhc15_noerrors_nobb/line_w_knobs_and_particle.json')
line.particle_ref = xp.Particles(mass0=xp.PROTON_MASS_EV, p0c=7e12)
line.build_tracker(_context=test_context)

if freeze_longitudinal:
kwargs = {'freeze_longitudinal': True}
for ee in line.elements:
if isinstance(ee, xt.Cavity):
ee.voltage = 0
else:
kwargs = {}

line.vars['i_oct_b1'] = 0
fp0 = line.get_fma(nemitt_x=nemitt_x, nemitt_y=nemitt_y,
**kwargs)

line.vars['i_oct_b1'] = 500
fp1 = line.get_fma(nemitt_x=nemitt_x, nemitt_y=nemitt_y,
n_r=11, n_theta=7, r_range=[0.05, 7],
theta_range=[0.01, np.pi/2-0.01],
keep_tracking_data=True,
**kwargs)


assert hasattr(fp1, 'tracking_data')

assert hasattr(fp1, 'theta_grid')
assert hasattr(fp1, 'r_grid')

assert hasattr(fp1, 'qx1')
assert hasattr(fp1, 'qx2')
assert hasattr(fp1, 'qy1')
assert hasattr(fp1, 'qy2')
assert hasattr(fp1, 'diffusion')

assert len(fp1.r_grid) == 11
assert len(fp1.theta_grid) == 7

assert np.isclose(fp1.r_grid[0], 0.05, rtol=0, atol=1e-10)
assert np.isclose(fp1.r_grid[-1], 7, rtol=0, atol=1e-10)

assert np.isclose(fp1.theta_grid[0], 0.01, rtol=0, atol=1e-10)
assert np.isclose(fp1.theta_grid[-1], np.pi/2 - 0.01, rtol=0, atol=1e-10)

#i_theta = 0, i_r = 0
assert np.isclose(fp1.x_norm_2d[0, 0], 0.05, rtol=0, atol=1e-3)
assert np.isclose(fp1.y_norm_2d[0, 0], 0, rtol=0, atol=1e-3)
assert np.isclose(fp1.qx1[0, 0], 0.31, rtol=0, atol=5e-5)
assert np.isclose(fp1.qy1[0, 0], 0.32, rtol=0, atol=5e-5)
assert np.isclose(fp1.qx2[0, 0], 0.31, rtol=0, atol=5e-5)
assert np.isclose(fp1.qy2[0, 0], 0.32, rtol=0, atol=5e-5)


153 changes: 152 additions & 1 deletion xtrack/footprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def __init__(self, nemitt_x=None, nemitt_y=None, n_turns=256, n_fft=2**18,
mode='polar', r_range=None, theta_range=None, n_r=None, n_theta=None,
x_norm_range=None, y_norm_range=None, n_x_norm=None, n_y_norm=None,
keep_fft=False, keep_tracking_data=False,
auto_to_numpy=True,fft_chunk_size=200
auto_to_numpy=True,fft_chunk_size=200,
n_turns_1 = None, n_turns_2 = None, hann = 3
):

assert nemitt_x is not None and nemitt_y is not None, (
Expand All @@ -74,6 +75,10 @@ def __init__(self, nemitt_x=None, nemitt_y=None, n_turns=256, n_fft=2**18,

self.nemitt_x = nemitt_x
self.nemitt_y = nemitt_y

self.n_turns_1 = n_turns_1
self.n_turns_2 = n_turns_2
self.hann = hann

assert mode in ['polar', 'uniform_action_grid'], (
'mode must be either polar or uniform_action_grid')
Expand Down Expand Up @@ -215,6 +220,131 @@ def _compute_footprint(self, line, freeze_longitudinal=False,

print ('Done computing footprint.')


def _compute_fma(self, line, freeze_longitudinal=False,
delta0=None, zeta0=None):

if freeze_longitudinal is None:
# In future we could detect if the line has frozen longitudinal plane
freeze_longitudinal = False

nplike_lib = line._context.nplike_lib

particles = line.build_particles(
x_norm=self.x_norm_2d.flatten(), y_norm=self.y_norm_2d.flatten(),
nemitt_x=self.nemitt_x, nemitt_y=self.nemitt_y,
zeta=zeta0, delta=delta0,
freeze_longitudinal=freeze_longitudinal,
method={True: '4d', False: '6d'}[freeze_longitudinal]
)

print('Tracking particles for fma...')
line.track(particles, num_turns=self.n_turns, turn_by_turn_monitor=True,
freeze_longitudinal=freeze_longitudinal)
print('Done tracking.')

ctx2np = line._context.nparray_from_context_array
assert np.all(ctx2np(particles.state == 1)), (
'Some particles were lost during tracking')
mon = line.record_last_track
mon.auto_to_numpy = False

if isinstance(line._context, xo.ContextPyopencl):
raise NotImplementedError(
'Footprint calculation with Pyopencl not supported yet. '
'Let us know if you need this feature.')
# Could be implemented using xobject fft

x_noCO = mon.x - nplike_lib.atleast_2d(mon.x.mean(axis=1)).T
y_noCO = mon.y - nplike_lib.atleast_2d(mon.y.mean(axis=1)).T

freq_axis = nplike_lib.fft.rfftfreq(self.n_fft)

npart = nplike_lib.shape(x_noCO)[0]
self.qx1 = nplike_lib.zeros(npart,dtype=float)
self.qy1 = nplike_lib.zeros(npart,dtype=float)
self.qx2 = nplike_lib.zeros(npart,dtype=float)
self.qy2 = nplike_lib.zeros(npart,dtype=float)
self.qy2 = nplike_lib.zeros(npart,dtype=float)
self.diffusion = nplike_lib.zeros(npart,dtype=float)

if self.keep_fft:
self.fft_x_1 = nplike_lib.zeros((npart,len(freq_axis)),dtype=complex)
self.fft_y_1 = nplike_lib.zeros((npart,len(freq_axis)),dtype=complex)
self.fft_x_2 = nplike_lib.zeros((npart,len(freq_axis)),dtype=complex)
self.fft_y_2 = nplike_lib.zeros((npart,len(freq_axis)),dtype=complex)

# Hann window
from scipy.special import factorial
def hann_window(turns, hann):
T = np.linspace(0, turns, num=turns, endpoint=True)*2.0*np.pi - np.pi*turns
return ((2.0**hann*factorial(hann)**2)/float(factorial(2*hann))) * (1.0 + np.cos(T/turns))**hann

# Compute in chunks
iStart = 0
print('Computing FFT..')
while iStart < npart:
iEnd = iStart + self.fft_chunk_size
if iEnd > npart:
iEnd = npart
# Apply window to each interval
hann_1 = hann_window(self.n_turns_1[1]-self.n_turns_1[0], self.hann)
hann_1 = np.tile(hann_1, (iEnd-iStart,1) )
hann_2 = hann_window(self.n_turns_2[1]-self.n_turns_2[0], self.hann)
hann_2 = np.tile(hann_2, (iEnd-iStart,1) )
signal_x_1 = x_noCO[iStart:iEnd,self.n_turns_1[0]:self.n_turns_1[1]] * hann_1
signal_y_1 = y_noCO[iStart:iEnd,self.n_turns_1[0]:self.n_turns_1[1]] * hann_1
signal_x_2 = x_noCO[iStart:iEnd,self.n_turns_2[0]:self.n_turns_2[1]] * hann_2
signal_y_2 = y_noCO[iStart:iEnd,self.n_turns_2[0]:self.n_turns_2[1]] * hann_2

fft_x_1 = nplike_lib.fft.rfft(signal_x_1, n=self.n_fft)
fft_y_1 = nplike_lib.fft.rfft(signal_y_1, n=self.n_fft)
fft_x_2 = nplike_lib.fft.rfft(signal_x_2, n=self.n_fft)
fft_y_2 = nplike_lib.fft.rfft(signal_y_2, n=self.n_fft)
if self.keep_fft:
self.fft_x_1[iStart:iEnd,:] = fft_x_1
self.fft_y_1[iStart:iEnd,:] = fft_y_1
self.fft_x_2[iStart:iEnd,:] = fft_x_2
self.fft_y_2[iStart:iEnd,:] = fft_y_2
qx1 = freq_axis[nplike_lib.argmax(nplike_lib.abs(fft_x_1), axis=1)]
qy1 = freq_axis[nplike_lib.argmax(nplike_lib.abs(fft_y_1), axis=1)]
qx2 = freq_axis[nplike_lib.argmax(nplike_lib.abs(fft_x_2), axis=1)]
qy2 = freq_axis[nplike_lib.argmax(nplike_lib.abs(fft_y_2), axis=1)]
diffusion = np.sqrt((qx1-qx2)**2 + (qy1-qy2)**2)
diffusion = np.log10([1e-60 if x == 0 else x for x in diffusion])

self.qx1[iStart:iEnd] = qx1
self.qy1[iStart:iEnd] = qy1
self.qx2[iStart:iEnd] = qx2
self.qy2[iStart:iEnd] = qy2
self.diffusion[iStart:iEnd] = diffusion
iStart += self.fft_chunk_size

self.qx1 = nplike_lib.reshape(self.qx1, self.x_norm_2d.shape)
self.qy1 = nplike_lib.reshape(self.qy1, self.y_norm_2d.shape)
self.qx2 = nplike_lib.reshape(self.qx2, self.x_norm_2d.shape)
self.qy2 = nplike_lib.reshape(self.qy2, self.y_norm_2d.shape)
self.diffusion = nplike_lib.reshape(self.diffusion, self.x_norm_2d.shape)

if self.auto_to_numpy:
ctx2np = line._context.nparray_from_context_array
self.qx1 = ctx2np(self.qx1)
self.qy1 = ctx2np(self.qy1)
self.qx2 = ctx2np(self.qx2)
self.qy2 = ctx2np(self.qy2)
self.diffusion = ctx2np(self.diffusion)
if self.keep_fft:
self.fft_x_1 = ctx2np(self.fft_x_1)
self.fft_y_1 = ctx2np(self.fft_y_1)
self.fft_x_2 = ctx2np(self.fft_x_2)
self.fft_y_2 = ctx2np(self.fft_y_2)

if self.keep_tracking_data:
self.tracking_data = mon

print ('Done computing fma.')


def _compute_tune_shift(self,_context,J1_2d,J1_grid,J2_2d,J2_grid,q,coherent_tune,epsilon):
nplike_lib = _context.nplike_lib
ctx2np = _context.nparray_from_context_array
Expand Down Expand Up @@ -380,3 +510,24 @@ def plot(self, ax=None, **kwargs):

ax.set_xlabel(r'$q_x$')
ax.set_ylabel(r'$q_y$')

def plot_fma(self, ax=None, s=1, vmin=-9, vmax=-4, cmap='jet', **kwargs):
import matplotlib.pyplot as plt

if ax is None:
ax = plt.gca()

labels = [None] * self.qx2.shape[1]

if 'label' in kwargs:
label_str = kwargs['label']
kwargs.pop('label')
labels[0] = label_str

scatter = ax.scatter(self.qx2, self.qy2, c=self.diffusion, label=labels, s=s, vmin=vmin, vmax=vmax, cmap=cmap, **kwargs)

ax.set_xlabel(r'$q_x$')
ax.set_ylabel(r'$q_y$')
#cbar = plt.colorbar(scatter, ax=ax)
#cbar.set_label(r'$\rm \log_{10}\left({\sqrt{\Delta Q_x^2 + \Delta Q_y^2}}\right)$')

88 changes: 87 additions & 1 deletion xtrack/line.py
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,92 @@ def get_footprint(self, nemitt_x=None, nemitt_y=None, n_turns=256, n_fft=2**18,

return fp


def get_fma(self, nemitt_x=None, nemitt_y=None, n_turns_1=(0,1000), n_turns_2=(1000, 2000), hann=3, n_fft=2**18,
mode='polar', r_range=None, theta_range=None, n_r=None, n_theta=None,
x_norm_range=None, y_norm_range=None, n_x_norm=None, n_y_norm=None,
freeze_longitudinal=None, delta0=None, zeta0=None,
keep_fft=True, keep_tracking_data=False):

'''
Compute the FMA for a beam with given emittences using tracking.

Parameters
----------

nemitt_x : float
Normalized emittance in the x-plane.
nemitt_y : float
Normalized emittance in the y-plane.
n_turns_1 : tuple of ints
First turn and last turn for first fft interval.
n_turns_2 : tuple of ints
First turn and last turn for second fft interval.
hann: int
Order of the hann window. Default is 3, reduce for noisy signals.
n_fft : int
Number of points for FFT (tracking data is zero-padded to this length).
mode : str
Mode for computing footprint. Options are 'polar' and 'uniform_action_grid'.
In 'polar' mode, the footprint is computed on a polar grid with
r_range and theta_range specifying the range of r and theta values (
polar coordinates in the x_norm, y_norm plane).
In 'uniform_action_grid' mode, the footprint is computed on a uniform
grid in the action space (Jx, Jy).
r_range : tuple of floats
Range of r values for footprint in polar mode. Default is (0.1, 6) sigmas.
theta_range : tuple of floats
Range of theta values for footprint in polar mode. Default is
(0.05, pi / 2 - 0.05) radians.
n_r : int
Number of r values for footprint in polar mode. Default is 10.
n_theta : int
Number of theta values for footprint in polar mode. Default is 10.
x_norm_range : tuple of floats
Range of x_norm values for footprint in `uniform action grid` mode.
Default is (0.1, 6) sigmas.
y_norm_range : tuple of floats
Range of y_norm values for footprint in `uniform action grid` mode.
Default is (0.1, 6) sigmas.
n_x_norm : int
Number of x_norm values for footprint in `uniform action grid` mode.
Default is 10.
n_y_norm : int
Number of y_norm values for footprint in `uniform action grid` mode.
Default is 10.
freeze_longitudinal : bool
If True, the longitudinal coordinates are frozen during the particles
matching and the tracking.
delta0: float
Initial value of the delta coordinate.
zeta0: float
Initial value of the zeta coordinate.

Returns
-------
fp : Footprint
Footprint object containing footprint data (fp.qx1, fp.qy1, fp.qx2, fp.qy2, fp.diffusion).

'''

kwargs = locals()
kwargs.pop('self')

freeze_longitudinal = kwargs.pop('freeze_longitudinal')
delta0 = kwargs.pop('delta0')
zeta0 = kwargs.pop('zeta0')

fp = Footprint(**kwargs)
fp.n_turns = n_turns_2[-1]
fp.n_turns_1 = n_turns_1
fp.n_turns_2 = n_turns_2
fp.hann = hann
fp._compute_fma(self,
freeze_longitudinal=freeze_longitudinal,
delta0=delta0, zeta0=zeta0)

return fp

def compute_one_turn_matrix_finite_differences(
self, particle_on_co,
steps_r_matrix=None,
Expand Down Expand Up @@ -3601,4 +3687,4 @@ def __init__(self, line, fields):
self._cache[ff] = LineAttrItem(name=name, index=index, line=line)

def __getitem__(self, key):
return self._cache[key].get_full_array()
return self._cache[key].get_full_array()