Source code for pyaml.common.element_holder

"""
Module handling element references for simulators and control system
"""

import fnmatch
import re
from typing import TYPE_CHECKING

from ..arrays.bpm_array import BPMArray
from ..arrays.cfm_magnet_array import CombinedFunctionMagnetArray
from ..arrays.element_array import ElementArray
from ..arrays.magnet_array import MagnetArray
from ..arrays.serialized_magnet_array import SerializedMagnetsArray
from ..bpm.bpm import BPM
from ..common.exception import PyAMLException
from ..diagnostics.chromaticity_monitor import ChomaticityMonitor
from ..diagnostics.tune_monitor import BetatronTuneMonitor
from ..magnet.cfm_magnet import CombinedFunctionMagnet
from ..magnet.magnet import Magnet
from ..magnet.serialized_magnet import SerializedMagnets
from ..rf.rf_plant import RFPlant
from ..rf.rf_transmitter import RFTransmitter
from .element import Element

if TYPE_CHECKING:
    from ..accelerator import Accelerator
    from ..tuning_tools.chromaticity import Chromaticity
    from ..tuning_tools.chromaticity_response_matrix import ChromaticityResponseMatrix
    from ..tuning_tools.dispersion import Dispersion
    from ..tuning_tools.orbit import Orbit
    from ..tuning_tools.orbit_response_matrix import OrbitResponseMatrix
    from ..tuning_tools.tune import Tune
    from ..tuning_tools.tune_response_matrix import TuneResponseMatrix


[docs] class ElementHolder(object): """ Class that store references of objects used from both simulators and control system """ def __init__(self): # Device handle self.__MAGNETS: dict[str, Magnet] = {} self.__CFM_MAGNETS: dict[str, CombinedFunctionMagnet] = {} self.__SERIALIZED_MAGNETS: dict[str, SerializedMagnets] = {} self.__BPMS: dict[str, BPM] = {} self.__RFPLANT: dict[str, RFPlant] = {} self.__RFTRANSMITTER: dict[str, RFTransmitter] = {} self.__DIAG: dict[str, Element] = {} self.__TUNING_TOOLS: dict[str, Element] = {} self.__ALL: dict[str, Element] = {} self.__by_class_elements: dict[type, dict] = { Magnet: self.__MAGNETS, CombinedFunctionMagnet: self.__CFM_MAGNETS, SerializedMagnets: self.__SERIALIZED_MAGNETS, BPM: self.__BPMS, RFPlant: self.__RFPLANT, RFTransmitter: self.__RFTRANSMITTER, } # Array handle self.__MAGNET_ARRAYS: dict = {} self.__CFM_MAGNET_ARRAYS: dict = {} self.__SERIALIZED_MAGNETS_ARRAYS: dict = {} self.__BPM_ARRAYS: dict = {} self.__ELEMENT_ARRAYS: dict = {} @property def peer(self) -> "Accelerator": """ Returns the peer accelerator of this holder """ return self._peer
[docs] def post_init(self): """ Method triggered after all initialisations are done """ for e in self.get_all_elements(): e.post_init()
[docs] def fill_device(self, elements: list[Element]): raise PyAMLException("ElementHolder.fill_device() is not subclassed")
[docs] def find_elements(self, filter: str) -> list[str]: if filter.startswith("re:"): pattern = re.compile(rf"{filter[3:]}") elements = [k for k in self.__ALL.keys() if pattern.fullmatch(k)] elif "*" in filter or "?" in filter: elements = [k for k in self.__ALL.keys() if fnmatch.fnmatch(k, filter)] else: elements = [filter] return elements
[docs] def fill_array( self, array_name: str, element_names: list[str], get_func, constructor, ARR: dict, ): # Handle wildcard, regexp and exclusion pattern all_names: list[str] = [] excluded_names: list[str] = [] for name in element_names: if name.startswith("~"): names = self.find_elements(name[1:]) excluded_names.extend(names) else: names = self.find_elements(name) all_names.extend(names) [all_names.remove(name) for name in excluded_names] a = [] for n in all_names: try: m = get_func(n) except Exception as err: raise PyAMLException(f"{constructor.__name__} {array_name} : {err} @index {len(a)}") from None if m in a: raise PyAMLException(f"{constructor.__name__} {array_name} : duplicate name {name} @index {len(a)}") from None a.append(m) ARR[array_name] = constructor(array_name, a)
def __add(self, array, element: Element): if element.get_name() in self.__ALL: # Ensure name unicity raise PyAMLException(f"Duplicate element {element.__class__.__name__} name {{element.get_name()}}") from None array[element.get_name()] = element self.__ALL[element.get_name()] = element def __get(self, what, name, array) -> Element: if name not in array: raise PyAMLException(f"{what} {name} not defined") return array[name] # Generic elements
[docs] def fill_element_array(self, arrayName: str, elementNames: list[str]): self.fill_array( arrayName, elementNames, self.get_element, ElementArray, self.__ELEMENT_ARRAYS, )
[docs] def add_element(self, element: Element): self.__ALL[element.get_name()] = element
[docs] def get_element(self, name: str) -> Element: return self.__get("Element", name, self.__ALL)
[docs] def get_elements(self, name: str) -> ElementArray: return self.__get("Element array", name, self.__ELEMENT_ARRAYS)
[docs] def get_all_elements(self) -> list[Element]: return [value for key, value in self.__ALL.items()]
# Magnets
[docs] def fill_magnet_array(self, arrayName: str, elementNames: list[str]): self.fill_array(arrayName, elementNames, self.get_magnet, MagnetArray, self.__MAGNET_ARRAYS)
[docs] def get_magnet(self, name: str) -> Magnet: return self.__get("Magnet", name, self.__MAGNETS)
[docs] def add_magnet(self, m: Magnet): self.__add(self.__MAGNETS, m)
[docs] def get_magnets(self, name: str) -> MagnetArray: return self.__get("Magnet array", name, self.__MAGNET_ARRAYS)
[docs] def get_all_magnets(self) -> list[Magnet]: return [value for key, value in self.__MAGNETS.items()]
# Combined Function Magnets
[docs] def fill_cfm_magnet_array(self, arrayName: str, elementNames: list[str]): self.fill_array( arrayName, elementNames, self.get_cfm_magnet, CombinedFunctionMagnetArray, self.__CFM_MAGNET_ARRAYS, )
[docs] def get_cfm_magnet(self, name: str) -> Magnet: return self.__get("CombinedFunctionMagnet", name, self.__CFM_MAGNETS)
[docs] def add_cfm_magnet(self, m: Magnet): self.__add(self.__CFM_MAGNETS, m)
[docs] def get_cfm_magnets(self, name: str) -> CombinedFunctionMagnetArray: return self.__get("CombinedFunctionMagnet array", name, self.__CFM_MAGNET_ARRAYS)
[docs] def get_all_cfm_magnets(self) -> list[CombinedFunctionMagnet]: return [value for key, value in self.__CFM_MAGNETS.items()]
# Serialized magnets
[docs] def fill_serialized_magnet_array(self, arrayName: str, elementNames: list[str]): self.fill_array( arrayName, elementNames, self.get_serialized_magnet, SerializedMagnetsArray, self.__SERIALIZED_MAGNETS_ARRAYS, )
[docs] def get_serialized_magnet(self, name: str) -> Magnet: return self.__get("SerializedMagnets", name, self.__SERIALIZED_MAGNETS)
[docs] def add_serialized_magnet(self, m: Magnet): self.__add(self.__SERIALIZED_MAGNETS, m)
[docs] def get_serialized_magnets(self, name: str) -> SerializedMagnetsArray: return self.__get("SerializedMagnets array", name, self.__SERIALIZED_MAGNETS_ARRAYS)
[docs] def get_all_serialized_magnets(self) -> list[SerializedMagnets]: return [value for key, value in self.__SERIALIZED_MAGNETS.items()]
# BPMs
[docs] def fill_bpm_array(self, arrayName: str, elementNames: list[str]): self.fill_array( arrayName, elementNames, self.get_bpm, BPMArray, self.__BPM_ARRAYS, )
[docs] def get_bpm(self, name: str) -> Element: return self.__get("BPM", name, self.__BPMS)
[docs] def add_bpm(self, bpm: BPM): self.__add(self.__BPMS, bpm)
[docs] def get_bpms(self, name: str) -> BPMArray: return self.__get("BPM array", name, self.__BPM_ARRAYS)
[docs] def get_all_bpms(self) -> list[BPM]: return [value for key, value in self.__BPMS.items()]
# RF
[docs] def get_rf_plant(self, name: str) -> RFPlant: return self.__get("RFPlant", name, self.__RFPLANT)
[docs] def add_rf_plant(self, rf: RFPlant): self.__add(self.__RFPLANT, rf)
[docs] def add_rf_transnmitter(self, rf: RFTransmitter): self.__add(self.__RFTRANSMITTER, rf)
[docs] def get_rf_trasnmitter(self, name: str) -> RFTransmitter: return self.__get("RFTransmitter", name, self.__RFTRANSMITTER)
# Tune monitor
[docs] def get_betatron_tune_monitor(self, name: str) -> BetatronTuneMonitor: return self.__get("Diagnostic", name, self.__DIAG)
[docs] def add_betatron_tune_monitor(self, tune_monitor: Element): self.__add(self.__DIAG, tune_monitor)
# Tuning/Measurement tools
[docs] def add_tool(self, tool: Element): self.__add(self.__TUNING_TOOLS, tool)
# ---- Chromaticity -------------------------------------------------
[docs] def get_chromaticity_monitor(self, name: str) -> ChomaticityMonitor: obj = self.__get("Chomaticity monitor", name, self.__TUNING_TOOLS) return obj
[docs] def get_chromaticity_tuning(self, name: str) -> "Chromaticity": return self.__get("Chromaticity tool", name, self.__TUNING_TOOLS)
[docs] def get_crm_tuning(self, name: str) -> "ChromaticityResponseMatrix": return self.__get("ChromaticityResponseMatrix tool", name, self.__TUNING_TOOLS)
@property def chromaticity(self) -> "Chromaticity": return self.get_chromaticity_tuning("DEFAULT_CHROMATICITY_CORRECTION") @property def crm(self) -> "ChromaticityResponseMatrix": return self.get_crm_tuning("DEFAULT_CHROMATICITY_RESPONSE_MATRIX") # ---- Tune ---------------------------------------------------------
[docs] def get_tune_tuning(self, name: str) -> "Tune": return self.__get("Tune tuning tool", name, self.__TUNING_TOOLS)
@property def tune(self) -> "Tune": return self.get_tune_tuning("DEFAULT_TUNE_CORRECTION")
[docs] def get_trm_tuning(self, name: str) -> "TuneResponseMatrix": return self.__get("TuneResponseMatrix tool", name, self.__TUNING_TOOLS)
@property def trm(self) -> "TuneResponseMatrix": return self.get_trm_tuning("DEFAULT_TUNE_RESPONSE_MATRIX") # ---- Orbit --------------------------------------------------------
[docs] def get_orbit_tuning(self, name: str) -> "Orbit": return self.__get("Orbit tuning tool", name, self.__TUNING_TOOLS)
@property def orbit(self) -> "Orbit": return self.get_orbit_tuning("DEFAULT_ORBIT_CORRECTION")
[docs] def get_orm_tuning(self, name: str) -> "OrbitResponseMatrix": return self.__get("OrbitResponseMatrix tool", name, self.__TUNING_TOOLS)
@property def orm(self) -> "OrbitResponseMatrix": return self.get_orm_tuning("DEFAULT_ORBIT_RESPONSE_MATRIX") # ---- Dispersive orbit --------------------------------------------
[docs] def get_dispersion_tuning(self, name: str) -> "Dispersion": return self.__get("Dispersion tool", name, self.__TUNING_TOOLS)
@property def dispersion(self) -> "Dispersion": return self.get_dispersion_tuning("DEFAULT_DISPERSION") def _get_array(self, name: str): """ Generic array resolver used by YellowPages. The method returns the array object referenced by 'name', regardless of its concrete type. """ if name in self.__BPM_ARRAYS: return self.__BPM_ARRAYS[name] if name in self.__MAGNET_ARRAYS: return self.__MAGNET_ARRAYS[name] if name in self.__CFM_MAGNET_ARRAYS: return self.__CFM_MAGNET_ARRAYS[name] if name in self.__SERIALIZED_MAGNETS_ARRAYS: return self.__SERIALIZED_MAGNETS_ARRAYS[name] if name in self.__ELEMENT_ARRAYS: return self.__ELEMENT_ARRAYS[name] raise PyAMLException(f"Array {name} not defined") def _get_tool(self, name: str): """ Generic tuning tool resolver used by YellowPages. """ if name not in self.__TUNING_TOOLS: raise PyAMLException(f"Tool {name} not defined") return self.__TUNING_TOOLS[name] def _get_diagnostic(self, name: str): """ Generic diagnostic resolver used by YellowPages. """ if name not in self.__DIAG: raise PyAMLException(f"Diagnostic {name} not defined") return self.__DIAG[name] def _list_arrays(self) -> list[str]: """ Return all array identifiers available in this holder. """ arrays: list[str] = [] arrays.extend(self.__BPM_ARRAYS.keys()) arrays.extend(self.__MAGNET_ARRAYS.keys()) arrays.extend(self.__CFM_MAGNET_ARRAYS.keys()) arrays.extend(self.__SERIALIZED_MAGNETS_ARRAYS.keys()) arrays.extend(self.__ELEMENT_ARRAYS.keys()) return arrays def _list_tools(self) -> list[str]: """ Return all tuning tool identifiers available in this holder. """ return list(self.__TUNING_TOOLS.keys()) def _list_diagnostics(self) -> list[str]: """ Return all diagnostic identifiers available in this holder. """ return list(self.__DIAG.keys()) def _set_energy(self, E: float): """ Sets the energy on all elements Parameters ---------- E : float Energy in eV """ # Needed by energy dependant element (i.e. magnet coil current calculation) for m in self.get_all_elements(): m.set_energy(E) def _set_mcf(self, alphac: float): """ Sets the moment compaction factor on all elements Parameters ---------- alphac : float Moment compaction factor """ # Needed by some off energy dependant element (i.e. chromaticty tools) for m in self.get_all_elements(): m.set_mcf(alphac) def _set_harmonic(self, h: int): """ Sets the harmonic number (number of bucket) on elements Parameters ---------- h : int Harmonic number """ for m in self.get_all_elements(): m.set_harmonic(h)