from ..common.abstract import ReadFloatArray
from ..common.constants import Action
from ..common.element import ElementConfigModel
from ..common.exception import PyAMLException
from ..tuning_tools.measurement_tool import MeasurementTool, MeasurementToolConfigModel
try:
from typing import Self # Python 3.11+
except ImportError:
from typing_extensions import Self # Python 3.10 and earlier
import logging
from time import sleep
import matplotlib.pyplot as plt
import numpy as np
from pydantic import ConfigDict
logger = logging.getLogger(__name__)
PYAMLCLASS = "ChomaticityMonitor"
[docs]
class ConfigModel(MeasurementToolConfigModel):
"""
Configuration model for Chromaticity Monitor.
Parameters
----------
betatron_tune_name : str
Name of the diagnostic pyaml device for measuring the tune
rf_plant_name : str
Name of main RF frequency plant
bpm_array_name : str,optional
Name of main BPM array used for dispersion fit
e_delta : float, optional
Default variation of relative energy during chromaticity measurement:
f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac,
by default 0.001
max_e_delta : float, optional
Maximum authorized variation of relative energy during chromaticity
measurement, by default 0.004
fit_order : int, optional
Chomaticity fitting order, by default 1
fit_disp_order : int, optional
Dispersion fitting order, by default 1
fit_dispersion : bool, optional
Dispersion fitting, by default False
"""
model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")
betatron_tune_name: str
rf_plant_name: str
bpm_array_name: str | None = None
e_delta: float = 0.001
max_e_delta: float = 0.004
fit_order: int = 1
fit_disp_order: int = 1
fit_dispersion: bool = False
[docs]
class RChromaDispArray(ReadFloatArray):
"""
Class providing read access to chromaticity or dispersion.
Returns arrays of shape (fit_order,2) or None
"""
def __init__(self, parent: "ChomaticityMonitor", name: str, unit: str):
self._parent = parent
self._name = name
self._unit = unit
[docs]
def get(self) -> np.array:
last = self._parent.latest_measurement
if last is not None and self._name in last:
return np.array(last[self._name])
else:
return None
[docs]
def unit(self) -> str:
return self.unit
[docs]
class ChomaticityMonitor(MeasurementTool):
"""
Class providing access to a chromaticity monitor
of a physical or simulated lattice. The monitor provides
horizontal and vertical chromaticity measurements.
"""
def __init__(self, cfg: ConfigModel):
"""
Construct a ChomaticityMonitor.
Parameters
----------
cfg : ConfigModel
Configuration for the ChromaticityMonitor, including betatron
tune monitor, RF plant, and defaults parameters.
"""
super().__init__(cfg.name)
self._cfg = cfg
self._chromaticity = RChromaDispArray(self, "chromaticity", "1")
self._dipsersion = RChromaDispArray(self, "dispersion", "m")
self._alphac = None
@property
def chromaticity(self) -> ReadFloatArray:
"""
Get the chromaticity values.
Returns
-------
ReadFloatArray
chromaticity values [q'x, q'y]
"""
return self._chromaticity
[docs]
def set_mcf(self, alphac: float):
self._alphac = alphac
@property
def dispersion(self) -> ReadFloatArray:
"""
Get the dispersion values.
Returns
-------
ReadFloatArray
Array of dispersion values [[dx, dy],[d'x, d'y],...]
"""
return self._dipsersion
[docs]
def measure(
self,
n_step: int = None,
alphac: float = None,
e_delta: float = None,
max_e_delta: float = None,
n_avg_meas: int = None,
sleep_between_meas: float = None,
sleep_between_step: float = None,
fit_order: int = None,
fit_disp_order: int = None,
fit_dispersion: bool | None = None,
do_plot: bool = None,
callback: callable = None,
):
"""
Main function for chromaticity measurment.
:py:attr:`~pyaml.tuning_tools.measurement_tool.MeasurementTool.latest_measurement` contains:
.. code-block:: python
chromaticity:np.array # First order chromaticity, Array of [q'x,q'y]
dispersion:np.array # First order dispersion, [[dx'0,..,dx'n],[dy'0,..,dy'n]]
chromaticity_fit:np.array # Array of [[qx,qy],[q'x,q'y],[q''x,q''y],...]
dispersionx_fit:np.array # Array of [[dx0],..,[dxn],[dx'0],..,[dx'n],...]
dispersiony_fit:np.array # Array of [[dy0,..,dyn],[dy'0,..,dy'n],...]
Parameters
----------
n_step: int
Default number of RF step during chromaticity
measurment [default: from config]
alphac: float | None
Moment compaction factor [default: from config]
w_delta: float
Default variation of relative energy during chromaticity measurment:
f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac
[default: from config]
max_e_delta: float
Maximum autorized variation of relative energy during chromaticity
measurment [default: from config]
n_avg_meas: int
Default number of tune/orbit measurment per RF frequency [default: from config]
sleep_between_meas: float
Default time sleep between two tune measurment [default: from config]
sleep_between_step: float
Default time sleep after RF frequency variation [default: from config]
fit_order: int
Fitting order [default: from config]
fit_disp_order : int, optional
Dispersion fitting order [default: from config]
fit_dispersion : bool, optionnal
Fit dispersion, [default: from config]
do_plot : bool
Do you want to plot the fitting results ?
callback: Callable, optional
Callback is executed after each measurement or setting.
If the callback return false, then the process is aborted.
callback_data dict contains:
.. code-block:: python
source:MeasurementTool # Tool that triggered the callback
step:int # The current step
avg_step:int # The current averaging step
rf:float # RF frequency used for the current step
tune:np.array # The measured tune (on Action.MEASURE)
orbit:np_array # The measured orbit, if fit_dispersion is True, (on Action.MEASURE)
dtune:np.array # The tune variation (on Action.RESTORE)
"""
n_step = n_step if n_step is not None else self._cfg.n_step
alphac = alphac if alphac is not None else self._alphac
e_delta = e_delta if e_delta is not None else self._cfg.e_delta
max_e_delta = max_e_delta if max_e_delta is not None else self._cfg.max_e_delta
n_avg_meas = n_avg_meas if n_avg_meas is not None else self._cfg.n_avg_meas
sleep_between_meas = sleep_between_meas if sleep_between_meas is not None else self._cfg.sleep_between_meas
sleep_between_step = sleep_between_step if sleep_between_step is not None else self._cfg.sleep_between_step
fit_order = fit_order if fit_order is not None else self._cfg.fit_order
fit_disp_order = fit_disp_order if fit_disp_order is not None else self._cfg.fit_disp_order
fit_dispersion = fit_dispersion if fit_dispersion is not None else self._cfg.fit_dispersion
if abs(e_delta) > abs(max_e_delta):
logger.warning(f"e_delta={e_delta} is greater than max_e_delta={max_e_delta}")
if alphac is None:
raise PyAMLException("Moment compaction factor is not defined")
self._init_measure("")
self._register_callback(callback)
# Get devices
self.check_peer()
tm = self.peer.get_betatron_tune_monitor(self._cfg.betatron_tune_name)
rf = self.peer.get_rf_plant(self._cfg.rf_plant_name)
bpms = None
n_bpm = 0
orbit = None
if fit_dispersion and fit_disp_order is not None and self._cfg.bpm_array_name is not None:
# For dispersion fit
bpms = self.peer.get_bpms(self._cfg.bpm_array_name)
n_bpm = len(bpms)
f0 = rf.frequency.get()
delta = np.linspace(-e_delta, e_delta, n_step)
delta_frec = -delta * alphac * f0
Q = np.zeros((n_step, 2))
if bpms is not None:
orbit = np.zeros((n_step, n_bpm, 2))
# ensure that, even if there is an issus, the script will finish by
# reseting the RF frequency to its original value
err = None
aborted = False
try:
for i, f in enumerate(delta_frec):
# TODO : Use set_and_wait once it is implemented !
rf.frequency.set(f0 + f)
self.send_callback(Action.APPLY, {"step": i, "rf": float(f0 + f)})
sleep(sleep_between_step)
# Averaging
for j in range(n_avg_meas):
tune = tm.tune.get()
Q[i] += tune
cb_data = {"step": i, "avg_step": j, "rf": float(f0 + f), "tune": tune}
if bpms is not None:
orb = bpms.positions.get()
orbit[i] += orb
cb_data["orbit"] = orb
self.send_callback(Action.MEASURE, cb_data)
if j < n_avg_meas - 1:
sleep(sleep_between_meas)
Q[i] /= float(n_avg_meas)
if bpms is not None:
orbit[i] /= float(n_avg_meas)
except Exception as ex:
err = ex
except KeyboardInterrupt as ex:
aborted = True
finally:
# Restore
rf.frequency.set(f0)
self.send_callback(Action.RESTORE, {"step": i, "rf": f0}, raiseException=False)
if err is not None:
raise (err)
if aborted:
logger.warning(f"{self.get_name()} : measurement aborted")
return False
if fit_dispersion:
self.fit(delta, Q, fit_order, orbit=orbit, fit_disp_order=fit_disp_order, do_plot=do_plot)
else:
self.fit(delta, Q, fit_order, do_plot=do_plot)
return True
[docs]
def fit(self, deltas, Q, order, orbit=None, fit_disp_order=None, do_plot=False):
"""
Compute chromaticity (and dispersion) from input data and update
:py:attr:`~pyaml.tuning_tools.measurement_tool.MeasurementTool.latest_measurement`.
Parameters
----------
deltas : array of float
Relative energy (delta) variation steps done.
Q : array of [Qx,Qy]
Horizontal,Vertical tune measured.
orbit: array of [[x0,y0],[x1,y1],...]
fit_order: int
Chromaticity fitting order
fit_disp_order : int, optional
Dispersion fitting order
plot : bool, optional
If True, plot the fit.
"""
chroma = np.polynomial.polynomial.polyfit(deltas, Q, order).T
self.latest_measurement["chromaticity_fit"] = chroma
self.latest_measurement["chromaticity"] = chroma[:, 1] # First order chroma
if orbit is not None:
dispx = np.polynomial.polynomial.polyfit(deltas, orbit[:, :, 0], fit_disp_order).T
self.latest_measurement["dispersionx_fit"] = dispx
dispy = np.polynomial.polynomial.polyfit(deltas, orbit[:, :, 1], fit_disp_order).T
self.latest_measurement["dispersiony_fit"] = dispy
self.latest_measurement["dispersion"] = [dispx[:, 1], dispy[:, 1]] # First order dispersion
if do_plot:
if fit_disp_order is None:
fig = plt.figure("Chromaticity measurement")
cols = 1
else:
fig = plt.figure("Chromaticity/Dispersion measurement")
cols = 2
for i in range(2):
ax = fig.add_subplot(2, cols, 1 + i)
ax.scatter(deltas * 100, Q[:, i])
title = ""
for o in range(order, -1, -1):
dp = ""
if o == 1:
dp = "dp/p"
elif o >= 1:
dp = f"(dp/p)$^{o}$"
title += f"{chroma[i][o]:.4f} {dp}"
if o != 0:
title += " + "
ax.plot(deltas * 100, np.polyval(chroma[i][::-1], deltas))
ax.set_title(title)
ax.set_ylabel("%s Tune" % ["Horizontal", "Vertical"][i])
ax.set_xlabel("Momentum Shift, dp/p [%]")
if fit_disp_order is not None:
ax = fig.add_subplot(2, cols, 3)
ax.plot(dispx[:, 1])
ax.set_ylabel("Dispersion [m]")
ax = fig.add_subplot(2, cols, 4)
ax.plot(dispy[:, 1])
ax.set_xlabel("BPM #")
ax.set_ylabel("Dispersion [m]")
fig.tight_layout()
plt.show()