Skip to content

modularize emittance measurement #263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 132 additions & 2 deletions lcls_tools/common/data/emittance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
bmag_func,
propagate_twiss,
build_quad_rmat,
bdes_to_kmod,
)


Expand Down Expand Up @@ -112,7 +113,9 @@ def loss_torch(params):
params = torch.reshape(params, [*beamsize_squared.shape[:-2], 3])
sig = torch.stack(beam_matrix_tuple(params), dim=-1).unsqueeze(-1)
# sig should now be shape batchshape x 3 x 1 (column vectors)
total_squared_error = (amat @ sig - beamsize_squared).abs().sum()
total_squared_error = (
(torch.sqrt(amat @ sig) - torch.sqrt(beamsize_squared)).abs().sum()
)
return total_squared_error

def loss_jacobian(params):
Expand All @@ -130,7 +133,9 @@ def loss(params):
params = np.reshape(params, [*beamsize_squared.shape[:-2], 3])
sig = np.expand_dims(np.stack(beam_matrix_tuple(params), axis=-1), axis=-1)
# sig should now be shape batchshape x 3 x 1 (column vectors)
total_squared_error = np.sum(np.abs(amat @ sig - beamsize_squared))
total_squared_error = np.sum(
np.abs(np.sqrt(amat @ sig) - np.sqrt(beamsize_squared))
)
return total_squared_error

loss_jacobian = None
Expand Down Expand Up @@ -206,6 +211,131 @@ def _twiss_upstream(b_matrix):
return rv


def preprocess_inputs(quad_vals: list, beamsizes: list, energy: float, q_len: float):
"""
Preprocesses the inputs for compute_emit_bmag.

Parameters
----------
quad_vals : list
A list of two arrays containing the quadrupole values in kG for x and y respectively.
beamsizes : dict
A list of two arrays containing the beam sizes in meters for x and y respectively.
energy : float
The energy of the beam in eV.
q_len : float
The effective length of the quadrupole in meters.

Returns
-------
tuple
A tuple containing the list of kmod values and the list of beam sizes squared.
"""
kmod_list = []
beamsizes_squared_list = []

for i in range(2):
# Get rid of nans
idx = ~np.isnan(beamsizes[i])
q = quad_vals[i][idx]
b = beamsizes[i][idx]

# Beamsizes to mm squared
beamsizes_squared_list.append((b * 1e3) ** 2)

# Quad values to kmod
kmod = bdes_to_kmod(energy, q_len, q)

# Negate for y
if i == 1:
kmod = -1 * kmod

kmod_list.append(kmod)

return kmod_list, beamsizes_squared_list


def compute_emit_bmag_machine_units(
quad_vals: list,
beamsizes: list,
q_len: float,
rmat: np.ndarray,
energy: float,
twiss_design: np.ndarray,
thin_lens: bool = False,
maxiter: int = None,
):
"""
Wrapper for compute_emit_bmag that takes quads in machine units and beamsize in meters.

Parameters
----------
quad_vals : list
A list of two arrays containing the quadrupole values in kG for x and y respectively.
beamsizes : list
A list of two arrays containing the beam sizes in meters for x and y respectively.
q_len : float
The effective length of the quadrupole in meters.
rmat : np.ndarray
The R-matrix. Shape (2, 2, 2).
energy : float
The energy of the beam in eV.
twiss_design : np.ndarray or None
The design Twiss parameters. Shape (2, 2).
thin_lens : bool, optional
Whether to use the thin lens approximation. Default is False.
maxiter : int, optional
Maximum number of iterations for the optimization. Default is None.

Returns
-------
dict
The results of the emittance calculation.
""" # Preprocessing data
kmod_list, beamsizes_squared_list = preprocess_inputs(
quad_vals, beamsizes, energy, q_len
)

# Prepare outputs
results = {
"emittance": [],
"twiss_at_screen": [],
"beam_matrix": [],
"bmag": [] if twiss_design is not None else None,
"quadrupole_focusing_strengths": [],
"quadrupole_pv_values": [],
"rms_beamsizes": [],
}

# Then call compute_emit_bmag
# fit scans independently for x/y
# only keep data that has non-nan beam sizes -- independent for x/y
for i in range(2):
result = compute_emit_bmag(
k=kmod_list[i],
beamsize_squared=beamsizes_squared_list[i],
q_len=q_len,
rmat=rmat[i],
twiss_design=twiss_design[i] if twiss_design is not None else None,
thin_lens=thin_lens,
maxiter=maxiter,
)

result.update({"quadrupole_focusing_strengths": kmod_list[i]})
result.update({"quadrupole_pv_values": quad_vals[i][~np.isnan(beamsizes[i])]})

# add results to dict object
for name, value in result.items():
if name == "bmag" and value is None:
continue
else: # beam matrix and emittance get appended
results[name].append(value)

results["rms_beamsizes"].append(beamsizes[i][~np.isnan(beamsizes[i])])

return results


def normalize_emittance(emit, energy):
gamma = energy / (
0.511e-3
Expand Down
117 changes: 64 additions & 53 deletions lcls_tools/common/measurements/emittance_measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
PositiveFloat,
)

from lcls_tools.common.data.emittance import compute_emit_bmag
from lcls_tools.common.data.model_general_calcs import bdes_to_kmod, get_optics
from lcls_tools.common.data.emittance import (
compute_emit_bmag_machine_units,
)
from lcls_tools.common.data.model_general_calcs import get_optics
from lcls_tools.common.devices.magnet import Magnet
from lcls_tools.common.measurements.measurement import Measurement
from lcls_tools.common.measurements.utils import NDArrayAnnotatedType
from lcls_tools.common.measurements.screen_profile import ScreenBeamProfileMeasurement
import lcls_tools


Expand Down Expand Up @@ -180,7 +183,7 @@ class QuadScanEmittance(Measurement):
rmat: Optional[ndarray] = None
design_twiss: Optional[dict] = None # design twiss values

wait_time: PositiveFloat = 5.0
wait_time: PositiveFloat = 1.0

name: str = "quad_scan_emittance"
model_config = ConfigDict(arbitrary_types_allowed=True)
Expand All @@ -190,6 +193,17 @@ def validate_rmat(cls, v, info):
assert v.shape == (2, 2, 2)
return v

@field_validator("beamsize_measurement", mode="after")
def validate_beamsize_measurement(cls, v, info):
# make sure the beamsize measurement is a ScreenBeamProfileMeasurement
# TODO: add check for wire scanner or other type of measurement
if not isinstance(v, ScreenBeamProfileMeasurement):
raise ValueError(
"Beamsize measurement must be a ScreenBeamProfileMeasurement for QuadScanEmittance"
)

return v

def measure(self):
"""
Conduct quadrupole scan to measure the beam phase space.
Expand All @@ -203,6 +217,21 @@ def measure(self):
# scan magnet strength and measure beamsize
self.perform_beamsize_measurements()

# calculate emittance from the measured beam sizes and quadrupole strengths
return self.calculate_emittance()

def calculate_emittance(self):
"""

Calculate the emittance from the measured beam sizes and quadrupole strengths.

Returns:
-------
result : EmittanceMeasurementResult
Object containing the results of the emittance measurement

"""

# extract beam sizes from info
scan_values, beam_sizes = self._get_beamsizes_scan_values_from_info()

Expand All @@ -229,68 +258,50 @@ def measure(self):
if self.design_twiss:
twiss_betas_alphas = np.array(
[
[self.design_twiss["beta_x"], self.design_twiss["alpha_x"]],
[self.design_twiss["beta_y"], self.design_twiss["alpha_y"]],
[
self.design_twiss["beta_x"],
self.design_twiss["alpha_x"],
],
[
self.design_twiss["beta_y"],
self.design_twiss["alpha_y"],
],
]
)

else:
twiss_betas_alphas = None

# fit scans independently for x/y
# only keep data that has non-nan beam sizes -- independent for x/y
results = {
"emittance": [],
"twiss_at_screen": [],
"beam_matrix": [],
"bmag": [] if twiss_betas_alphas is not None else None,
"quadrupole_focusing_strengths": [],
"quadrupole_pv_values": [],
"rms_beamsizes": [],
inputs = {
"quad_vals": scan_values,
"beamsizes": beam_sizes,
"q_len": magnet_length,
"rmat": self.rmat,
"energy": self.energy,
"twiss_design": (
twiss_betas_alphas if twiss_betas_alphas is not None else None
),
}

for i in range(2):
single_beam_size = beam_sizes[i][~np.isnan(beam_sizes[i])]
beam_size_squared = (single_beam_size * 1e3) ** 2
kmod = bdes_to_kmod(
self.energy, magnet_length, scan_values[i][~np.isnan(beam_sizes[i])]
)

# negate for y
if i == 1:
kmod = -1 * kmod

# compute emittance and bmag
result = compute_emit_bmag(
k=kmod,
beamsize_squared=beam_size_squared.T,
q_len=magnet_length,
rmat=self.rmat[i],
twiss_design=twiss_betas_alphas[i]
if twiss_betas_alphas is not None
else None,
)
result.update({"quadrupole_focusing_strengths": kmod})
result.update(
{"quadrupole_pv_values": scan_values[i][~np.isnan(beam_sizes[i])]}
)

# add results to dict object
for name, value in result.items():
if name == "bmag" and value is None:
continue
else:
results[name].append(value)

results["rms_beamsizes"].append(single_beam_size)

results.update({"metadata": self.model_dump()})
# Call wrapper that takes quads in machine units and beamsize in meters
results = compute_emit_bmag_machine_units(**inputs)
results.update(
{
"metadata": self.model_dump()
| {
"resolution": self.beamsize_measurement.device.resolution,
"image_data": {
str(sval): ele.model_dump()
for sval, ele in zip(self.scan_values, self._info)
},
}
}
)

# collect information into EmittanceMeasurementResult object
return EmittanceMeasurementResult(**results)

def perform_beamsize_measurements(self):
"""Perform the beamsize measurements"""
"""Perform the beamsize measurements using a basic quadrupole scan."""
self.magnet.scan(scan_settings=self.scan_values, function=self.measure_beamsize)

def measure_beamsize(self):
Expand Down
Loading