Source code for pyaml.tuning_tools.chromaticity_response_matrix

import logging
import time
from typing import Callable, Optional

import numpy as np
from pydantic import ConfigDict

from ..common.constants import Action
from ..common.element import ElementConfigModel
from .measurement_tool import MeasurementTool, MeasurementToolConfigModel
from .response_matrix_data import ConfigModel as ResponseMatrixDataConfigModel

logger = logging.getLogger(__name__)

PYAMLCLASS = "ChromaticityResponseMatrix"


[docs] class ConfigModel(MeasurementToolConfigModel): """ Configuration model for Tune response matrix Parameters ---------- sextu_array_name : str Array name of sextupole used to adjust the chromaticity chromaticity_name : str Name of the diagnostic chromaticy monitor sextu_delta : float Delta strength used to get the response matrix """ model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") sextu_array_name: str chromaticity_name: str sextu_delta: float
[docs] class ChromaticityResponseMatrix(MeasurementTool): def __init__(self, cfg: ConfigModel): super().__init__(cfg.name) self._cfg = cfg self.aborted = False
[docs] def measure( self, sextu_delta: Optional[float] = None, n_step: Optional[int] = None, sleep_between_step: Optional[float] = None, n_avg_meas: Optional[int] = None, sleep_between_meas: Optional[float] = None, callback: Optional[Callable] = None, ): """ Measure chromaticity response matrix. :py:attr:`~pyaml.tuning_tools.measurement_tool.MeasurementTool.latest_measurement` contains: .. code-block:: python matrix:list[list[float] # The response matrix variable_names:list[str] # Variable names observable_names:list[str] # Observables names **Example** .. code-block:: python from pyaml.accelerator import Accelerator from pyaml.common.constants import Action def callback(action: Action, data:dict): print(f"{action}, data:{data}") return True sr = Accelerator.load("tests/config/EBSOrbit.yaml") acc = sr.design if acc.crm.measure(callback=callback): acc.crm.save("ideal_crm.json") acc.crm.save("ideal_crm.yaml", with_type="yaml") acc.crm.save("ideal_crm.npz", with_type="npz") Parameters ---------- sextu_delta : float Delta strength used to get the response matrix n_step: int, optional Number of step for fitting the chomaticity slope [-sextu_delta/n_step..sextu_delta/n_step] Default from config sleep_between_step: float Default time sleep after sextu exitation Default: from config n_avg_meas : int, optional Default number of chromaticity measurement per step used for averaging Default from config sleep_between_meas: float Default time sleep between two chomaticity measurment Default: from config callback : Callable, optional Callback executed after each strength setting or measurement. See :py:meth:`~.measurement_tool.MeasurementTool.send_callback`. If the callback return false, then the scan is aborted and strength restored. callback_data dict contains: .. code-block:: python source:MeasurementTool # Tool that triggered the callback idx:int # The index in the element array being processed step:int # The current step avg_step:int # The current averaging step magnet:str # The magnet being excited strength:float # Magnet strength chroma:np.array # The measured chroma (on Action.MEASURE) dchroma:np.array # The chroma variation (on Action.RESTORE) """ # Get devices self.check_peer() sextus = self._peer.get_magnets(self._cfg.sextu_array_name) cm = self._peer.get_chromaticity_monitor(self._cfg.chromaticity_name) self._register_callback(callback) self._init_measure("pyaml.tuning_tools.response_matrix_data") chromamat = np.zeros((len(sextus), 2)) initial_chroma = None if not cm.measure(callback=callback): # Aborted return False initial_chroma = cm.chromaticity.get() delta = sextu_delta if sextu_delta is not None else self._cfg.sextu_delta nb_step = n_step if n_step is not None else self._cfg.n_step nb_meas = n_avg_meas if n_avg_meas is not None else self._cfg.n_avg_meas sleep_step = sleep_between_step if sleep_between_step is not None else self._cfg.sleep_between_step sleep_meas = sleep_between_meas if sleep_between_meas is not None else self._cfg.sleep_between_meas err = None aborted = False try: for qidx, m in enumerate(sextus): str = m.strength.get() # Initial strength deltas = np.linspace(-delta, delta, nb_step) Qp = np.zeros((nb_step, 2)) for step, d in enumerate(deltas): # apply strength m.strength.set(str + d) self.send_callback( Action.APPLY, {"idx": qidx, "step": step, "magnet": m.get_name(), "strength": float(str + d)} ) time.sleep(sleep_step) # Chroma averaging Qp[step] = np.zeros(2) for avg in range(nb_meas): if not cm.measure(callback=callback): raise KeyboardInterrupt chroma = cm.chromaticity.get() Qp[step] += chroma self.send_callback( Action.MEASURE, {"idx": qidx, "step": step, "avg_step": avg, "magnet": m.get_name(), "chroma": chroma}, ) if avg < nb_meas - 1: time.sleep(sleep_meas) Qp[step] /= float(nb_meas) # Fit and fill matrix with the slopes if nb_step == 1: chromamat[qidx] = (Qp - initial_chroma) / deltas[0] else: coefs = np.polynomial.polynomial.polyfit(deltas, Qp, 1) chromamat[qidx] = coefs[1] # Restore strength m.strength.set(str) self.send_callback( Action.RESTORE, {"idx": qidx, "magnet": m.get_name(), "strength": float(str), "dchroma": chromamat[qidx]}, ) except Exception as ex: err = ex except KeyboardInterrupt as ex: aborted = True finally: # Restore strength m.strength.set(str) self.send_callback( Action.RESTORE, {"step": qidx, "magnet": m.get_name(), "strength": float(str), "dchroma": chromamat[qidx]}, raiseException=False, ) if err is not None: raise (err) if aborted: logger.warning(f"{self.get_name()} : measurement aborted") return False mat = ResponseMatrixDataConfigModel( matrix=chromamat.T.tolist(), variable_names=sextus.names(), observable_names=[cm.get_name() + ".x", cm.get_name() + ".y"], ) self.latest_measurement.update(mat.model_dump()) self.latest_measurement["type"] = "pyaml.tuning_tools.response_matrix_data" return True