Source code for pyaml.tuning_tools.chromaticity_monitor

from ..common.abstract import ReadFloatArray
from ..common.constants import Action
from ..common.element import ElementConfigModel
from ..common.exception import PyAMLException
from ..tuning_tools.measurement_tool import MeasurementTool, MeasurementToolConfigModel

try:
    from typing import Self  # Python 3.11+
except ImportError:
    from typing_extensions import Self  # Python 3.10 and earlier
import logging
from time import sleep

import matplotlib.pyplot as plt
import numpy as np
from pydantic import ConfigDict

logger = logging.getLogger(__name__)

PYAMLCLASS = "ChomaticityMonitor"


[docs] class ConfigModel(MeasurementToolConfigModel): """ Configuration model for Chromaticity Monitor. Parameters ---------- betatron_tune_name : str Name of the diagnostic pyaml device for measuring the tune rf_plant_name : str Name of main RF frequency plant bpm_array_name : str,optional Name of main BPM array used for dispersion fit e_delta : float, optional Default variation of relative energy during chromaticity measurement: f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac, by default 0.001 max_e_delta : float, optional Maximum authorized variation of relative energy during chromaticity measurement, by default 0.004 fit_order : int, optional Chomaticity fitting order, by default 1 fit_disp_order : int, optional Dispersion fitting order, by default 1 fit_dispersion : bool, optional Dispersion fitting, by default False """ model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") betatron_tune_name: str rf_plant_name: str bpm_array_name: str | None = None e_delta: float = 0.001 max_e_delta: float = 0.004 fit_order: int = 1 fit_disp_order: int = 1 fit_dispersion: bool = False
[docs] class RChromaDispArray(ReadFloatArray): """ Class providing read access to chromaticity or dispersion. Returns arrays of shape (fit_order,2) or None """ def __init__(self, parent: "ChomaticityMonitor", name: str, unit: str): self._parent = parent self._name = name self._unit = unit
[docs] def get(self) -> np.array: last = self._parent.latest_measurement if last is not None and self._name in last: return np.array(last[self._name]) else: return None
[docs] def unit(self) -> str: return self.unit
[docs] class ChomaticityMonitor(MeasurementTool): """ Class providing access to a chromaticity monitor of a physical or simulated lattice. The monitor provides horizontal and vertical chromaticity measurements. """ def __init__(self, cfg: ConfigModel): """ Construct a ChomaticityMonitor. Parameters ---------- cfg : ConfigModel Configuration for the ChromaticityMonitor, including betatron tune monitor, RF plant, and defaults parameters. """ super().__init__(cfg.name) self._cfg = cfg self._chromaticity = RChromaDispArray(self, "chromaticity", "1") self._dipsersion = RChromaDispArray(self, "dispersion", "m") self._alphac = None @property def chromaticity(self) -> ReadFloatArray: """ Get the chromaticity values. Returns ------- ReadFloatArray chromaticity values [q'x, q'y] """ return self._chromaticity
[docs] def set_mcf(self, alphac: float): self._alphac = alphac
@property def dispersion(self) -> ReadFloatArray: """ Get the dispersion values. Returns ------- ReadFloatArray Array of dispersion values [[dx, dy],[d'x, d'y],...] """ return self._dipsersion
[docs] def measure( self, n_step: int = None, alphac: float = None, e_delta: float = None, max_e_delta: float = None, n_avg_meas: int = None, sleep_between_meas: float = None, sleep_between_step: float = None, fit_order: int = None, fit_disp_order: int = None, fit_dispersion: bool | None = None, do_plot: bool = None, callback: callable = None, ): """ Main function for chromaticity measurment. :py:attr:`~pyaml.tuning_tools.measurement_tool.MeasurementTool.latest_measurement` contains: .. code-block:: python chromaticity:np.array # First order chromaticity, Array of [q'x,q'y] dispersion:np.array # First order dispersion, [[dx'0,..,dx'n],[dy'0,..,dy'n]] chromaticity_fit:np.array # Array of [[qx,qy],[q'x,q'y],[q''x,q''y],...] dispersionx_fit:np.array # Array of [[dx0],..,[dxn],[dx'0],..,[dx'n],...] dispersiony_fit:np.array # Array of [[dy0,..,dyn],[dy'0,..,dy'n],...] Parameters ---------- n_step: int Default number of RF step during chromaticity measurment [default: from config] alphac: float | None Moment compaction factor [default: from config] w_delta: float Default variation of relative energy during chromaticity measurment: f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac [default: from config] max_e_delta: float Maximum autorized variation of relative energy during chromaticity measurment [default: from config] n_avg_meas: int Default number of tune/orbit measurment per RF frequency [default: from config] sleep_between_meas: float Default time sleep between two tune measurment [default: from config] sleep_between_step: float Default time sleep after RF frequency variation [default: from config] fit_order: int Fitting order [default: from config] fit_disp_order : int, optional Dispersion fitting order [default: from config] fit_dispersion : bool, optionnal Fit dispersion, [default: from config] do_plot : bool Do you want to plot the fitting results ? callback: Callable, optional Callback is executed after each measurement or setting. If the callback return false, then the process is aborted. callback_data dict contains: .. code-block:: python source:MeasurementTool # Tool that triggered the callback step:int # The current step avg_step:int # The current averaging step rf:float # RF frequency used for the current step tune:np.array # The measured tune (on Action.MEASURE) orbit:np_array # The measured orbit, if fit_dispersion is True, (on Action.MEASURE) dtune:np.array # The tune variation (on Action.RESTORE) """ n_step = n_step if n_step is not None else self._cfg.n_step alphac = alphac if alphac is not None else self._alphac e_delta = e_delta if e_delta is not None else self._cfg.e_delta max_e_delta = max_e_delta if max_e_delta is not None else self._cfg.max_e_delta n_avg_meas = n_avg_meas if n_avg_meas is not None else self._cfg.n_avg_meas sleep_between_meas = sleep_between_meas if sleep_between_meas is not None else self._cfg.sleep_between_meas sleep_between_step = sleep_between_step if sleep_between_step is not None else self._cfg.sleep_between_step fit_order = fit_order if fit_order is not None else self._cfg.fit_order fit_disp_order = fit_disp_order if fit_disp_order is not None else self._cfg.fit_disp_order fit_dispersion = fit_dispersion if fit_dispersion is not None else self._cfg.fit_dispersion if abs(e_delta) > abs(max_e_delta): logger.warning(f"e_delta={e_delta} is greater than max_e_delta={max_e_delta}") if alphac is None: raise PyAMLException("Moment compaction factor is not defined") self._init_measure("") self._register_callback(callback) # Get devices self.check_peer() tm = self.peer.get_betatron_tune_monitor(self._cfg.betatron_tune_name) rf = self.peer.get_rf_plant(self._cfg.rf_plant_name) bpms = None n_bpm = 0 orbit = None if fit_dispersion and fit_disp_order is not None and self._cfg.bpm_array_name is not None: # For dispersion fit bpms = self.peer.get_bpms(self._cfg.bpm_array_name) n_bpm = len(bpms) f0 = rf.frequency.get() delta = np.linspace(-e_delta, e_delta, n_step) delta_frec = -delta * alphac * f0 Q = np.zeros((n_step, 2)) if bpms is not None: orbit = np.zeros((n_step, n_bpm, 2)) # ensure that, even if there is an issus, the script will finish by # reseting the RF frequency to its original value err = None aborted = False try: for i, f in enumerate(delta_frec): # TODO : Use set_and_wait once it is implemented ! rf.frequency.set(f0 + f) self.send_callback(Action.APPLY, {"step": i, "rf": float(f0 + f)}) sleep(sleep_between_step) # Averaging for j in range(n_avg_meas): tune = tm.tune.get() Q[i] += tune cb_data = {"step": i, "avg_step": j, "rf": float(f0 + f), "tune": tune} if bpms is not None: orb = bpms.positions.get() orbit[i] += orb cb_data["orbit"] = orb self.send_callback(Action.MEASURE, cb_data) if j < n_avg_meas - 1: sleep(sleep_between_meas) Q[i] /= float(n_avg_meas) if bpms is not None: orbit[i] /= float(n_avg_meas) except Exception as ex: err = ex except KeyboardInterrupt as ex: aborted = True finally: # Restore rf.frequency.set(f0) self.send_callback(Action.RESTORE, {"step": i, "rf": f0}, raiseException=False) if err is not None: raise (err) if aborted: logger.warning(f"{self.get_name()} : measurement aborted") return False if fit_dispersion: self.fit(delta, Q, fit_order, orbit=orbit, fit_disp_order=fit_disp_order, do_plot=do_plot) else: self.fit(delta, Q, fit_order, do_plot=do_plot) return True
[docs] def fit(self, deltas, Q, order, orbit=None, fit_disp_order=None, do_plot=False): """ Compute chromaticity (and dispersion) from input data and update :py:attr:`~pyaml.tuning_tools.measurement_tool.MeasurementTool.latest_measurement`. Parameters ---------- deltas : array of float Relative energy (delta) variation steps done. Q : array of [Qx,Qy] Horizontal,Vertical tune measured. orbit: array of [[x0,y0],[x1,y1],...] fit_order: int Chromaticity fitting order fit_disp_order : int, optional Dispersion fitting order plot : bool, optional If True, plot the fit. """ chroma = np.polynomial.polynomial.polyfit(deltas, Q, order).T self.latest_measurement["chromaticity_fit"] = chroma self.latest_measurement["chromaticity"] = chroma[:, 1] # First order chroma if orbit is not None: dispx = np.polynomial.polynomial.polyfit(deltas, orbit[:, :, 0], fit_disp_order).T self.latest_measurement["dispersionx_fit"] = dispx dispy = np.polynomial.polynomial.polyfit(deltas, orbit[:, :, 1], fit_disp_order).T self.latest_measurement["dispersiony_fit"] = dispy self.latest_measurement["dispersion"] = [dispx[:, 1], dispy[:, 1]] # First order dispersion if do_plot: if fit_disp_order is None: fig = plt.figure("Chromaticity measurement") cols = 1 else: fig = plt.figure("Chromaticity/Dispersion measurement") cols = 2 for i in range(2): ax = fig.add_subplot(2, cols, 1 + i) ax.scatter(deltas * 100, Q[:, i]) title = "" for o in range(order, -1, -1): dp = "" if o == 1: dp = "dp/p" elif o >= 1: dp = f"(dp/p)$^{o}$" title += f"{chroma[i][o]:.4f} {dp}" if o != 0: title += " + " ax.plot(deltas * 100, np.polyval(chroma[i][::-1], deltas)) ax.set_title(title) ax.set_ylabel("%s Tune" % ["Horizontal", "Vertical"][i]) ax.set_xlabel("Momentum Shift, dp/p [%]") if fit_disp_order is not None: ax = fig.add_subplot(2, cols, 3) ax.plot(dispx[:, 1]) ax.set_ylabel("Dispersion [m]") ax = fig.add_subplot(2, cols, 4) ax.plot(dispy[:, 1]) ax.set_xlabel("BPM #") ax.set_ylabel("Dispersion [m]") fig.tight_layout() plt.show()