Source code for pyaml.tuning_tools.orbit_response_matrix

import logging
from pathlib import Path
from typing import Callable, List, Optional, Self

import pySC
from pydantic import ConfigDict
from pySC.apps import measure_ORM
from pySC.apps.codes import ResponseCode

from ..common.constants import Action
from ..external.pySC_interface import pySCInterface
from .measurement_tool import MeasurementTool, MeasurementToolConfigModel
from .orbit_response_matrix_data import ConfigModel as OrbitResponseMatrixDataConfigModel

logger = logging.getLogger(__name__)

PYAMLCLASS = "OrbitResponseMatrix"


[docs] class ConfigModel(MeasurementToolConfigModel): """ Configuration model for orbit response matrix measurement Parameters ---------- bpm_array_name : str BPM array name hcorr_array_name : str Horizontal corrector array name vcorr_array_name : str Vertical corrector array name corrector_delta : float Corrector delta for measurement """ model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") bpm_array_name: str hcorr_array_name: str vcorr_array_name: str corrector_delta: float
[docs] class OrbitResponseMatrix(MeasurementTool): def __init__(self, cfg: ConfigModel): super().__init__(cfg.name) self._cfg = cfg self.bpm_array_name = cfg.bpm_array_name self.hcorr_array_name = cfg.hcorr_array_name self.vcorr_array_name = cfg.vcorr_array_name self.corrector_delta = cfg.corrector_delta
[docs] def measure( self, corrector_names: Optional[List[str]] = None, sleep_between_step: Optional[float] = None, n_avg_meas: Optional[int] = None, sleep_between_meas: Optional[float] = None, callback: Optional[Callable] = None, ): """ Measure orbit response matrix. **Example** .. code-block:: python sr = Accelerator.load("MyAccelerator.yaml") acc = sr.design if acc.orm.measure(): acc.orm.save("ideal_orm.json") acc.orm.save("ideal_orm.yaml", with_type="yaml") acc.orm.save("ideal_orm.npz", with_type="npz") Parameters ---------- sleep_between_step: float Default time sleep after steerer exitation Default: from config n_avg_meas : int, optional Default number of orbit measurement per step used for averaging Default from config sleep_between_meas: float Default time sleep between two orbit measurment Default: from config callback : Callable, optional example: callback(action:int, callback_data: 'Complicated struct') callback is executed after each strength setting and after each orbit reading. If the callback returns false, then the process is aborted. """ nb_meas = n_avg_meas if n_avg_meas is not None else self._cfg.n_avg_meas sleep_step = sleep_between_step if sleep_between_step is not None else self._cfg.sleep_between_step sleep_meas = sleep_between_meas if sleep_between_meas is not None else self._cfg.sleep_between_meas element_holder = self._peer interface = pySCInterface( element_holder=element_holder, bpm_array_name=self.bpm_array_name, ) # TODO handle sleep_meas interface.set_wait_time = sleep_step if corrector_names is None: logger.info(f"Measuring correctors from the default arrays: {self.hcorr_array_name} and {self.vcorr_array_name}.") hcorrector_names = element_holder.get_magnets(self.hcorr_array_name).names() vcorrector_names = element_holder.get_magnets(self.vcorr_array_name).names() corrector_names = hcorrector_names + vcorrector_names generator = measure_ORM( interface=interface, corrector_names=corrector_names, delta=self.corrector_delta, skip_save=True, shots_per_orbit=nb_meas, ) pySC.disable_pySC_rich() aborted = False err = None idx = 0 try: self._register_callback(callback) self._init_measure() for code, measurement in generator: callback_data = {"idx": idx, "response_data": measurement.response_data} if code is ResponseCode.AFTER_SET: self.send_callback(Action.APPLY, callback_data) elif code is ResponseCode.AFTER_GET: self.send_callback(Action.MEASURE, callback_data) elif code is ResponseCode.AFTER_RESTORE: logger.info(f"Measured response of {measurement.last_input}.") self.send_callback(Action.RESTORE, callback_data) idx += 1 except Exception as ex: err = ex except KeyboardInterrupt as ex: aborted = True finally: # Restore steerer strength # TODO self.send_callback( Action.RESTORE, {"idx": idx}, raiseException=False, ) if err is not None: raise (err) if aborted: logger.warning(f"{self.get_name()} : measurement aborted (settings not restored)") return False orm_data = self._pySC_response_data_to_ORMData(measurement.response_data.model_dump()) self.latest_measurement.update(orm_data.model_dump()) self.latest_measurement["type"] = "pyaml.tuning_tools.orbit_response_matrix_data" return True
def _pySC_response_data_to_ORMData(self, data: dict) -> OrbitResponseMatrixDataConfigModel: # all metadata is discarded here. Should we keep something? element_holder = self._peer all_hcorrector_names = element_holder.get_magnets(self.hcorr_array_name).names() all_vcorrector_names = element_holder.get_magnets(self.vcorr_array_name).names() variable_planes = [] for corr in data["input_names"]: if corr in all_hcorrector_names: variable_planes.append("H") elif corr in all_vcorrector_names: variable_planes.append("V") bpm_names = element_holder.get_bpms(self.bpm_array_name).names() # This is because we assume always dual-plane bpms now. len_b = len(bpm_names) observable_names = bpm_names * 2 observable_planes = ["H"] * len_b + ["V"] * len_b orm_data_model = { "matrix": data["matrix"], "variable_names": data["input_names"], "observable_names": observable_names, "rf_response": None, "variable_planes": variable_planes, "observable_planes": observable_planes, } orm_data = OrbitResponseMatrixDataConfigModel(**orm_data_model) return orm_data