Source code for pyaml.lattice.simulator

from pathlib import Path

import at
from pydantic import BaseModel, ConfigDict

from ..bpm.bpm import BPM
from ..common.abstract_aggregator import ScalarAggregator
from ..common.element import Element
from ..common.element_holder import ElementHolder
from ..common.exception import PyAMLException
from ..configuration import get_root_folder
from ..diagnostics.tune_monitor import BetatronTuneMonitor
from ..lattice.abstract_impl import (
    BPMHScalarAggregator,
    BPMScalarAggregator,
    BPMVScalarAggregator,
    RBetatronTuneArray,
    RBpmArray,
    RWBpmOffsetArray,
    RWBpmTiltScalar,
    RWHardwareArray,
    RWHardwareScalar,
    RWRFATFrequencyScalar,
    RWRFATotalVoltageScalar,
    RWRFFrequencyScalar,
    RWRFPhaseScalar,
    RWRFVoltageScalar,
    RWSerializedHardware,
    RWSerializedStrength,
    RWStrengthArray,
    RWStrengthScalar,
)
from ..magnet.cfm_magnet import CombinedFunctionMagnet
from ..magnet.magnet import Magnet
from ..magnet.serialized_magnet import SerializedMagnets
from ..rf.rf_plant import RFPlant, RWTotalVoltage
from ..rf.rf_transmitter import RFTransmitter
from ..tuning_tools.measurement_tool import MeasurementTool
from ..tuning_tools.tuning_tool import TuningTool
from .attribute_linker import (
    ConfigModel as PyAtAttrLinkerConfigModel,
)
from .attribute_linker import (
    PyAtAttributeElementsLinker,
)
from .lattice_elements_linker import LatticeElementsLinker

# Define the main class name for this module
PYAMLCLASS = "Simulator"


[docs] class ConfigModel(BaseModel): """ Configuration model for Simulator Parameters ---------- name : str Simulator name lattice : str AT lattice file mat_key : str, optional AT lattice ring name linker : LatticeElementsLinker, optional The linker configuration model description : str , optional Simulator description """ model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") name: str lattice: str mat_key: str = None linker: LatticeElementsLinker = None description: str | None = None
[docs] class Simulator(ElementHolder): """ Class that implements access to AT simulator """ def __init__(self, cfg: ConfigModel): super().__init__() self._cfg = cfg path: Path = get_root_folder() / cfg.lattice if self._cfg.mat_key is None: self.ring = at.load_lattice(path) else: self.ring = at.load_lattice(path, mat_key=f"{self._cfg.mat_key}") self._linker = cfg.linker if self._linker: self._linker.set_lattice(self.ring) else: self._elements_indexing = {} for e in self.ring: if e.FamName in self._elements_indexing: self._elements_indexing[e.FamName].append(e) else: self._elements_indexing[e.FamName] = [e]
[docs] def name(self) -> str: return self._cfg.name
[docs] def get_lattice(self) -> at.Lattice: return self.ring
[docs] def get_description(self) -> str: """ Returns the description of the accelerator """ return self._cfg.description
[docs] def create_magnet_strength_aggregator(self, magnets: list[Magnet]) -> ScalarAggregator: # No magnet aggregator for simulator return None
[docs] def create_magnet_hardware_aggregator(self, magnets: list[Magnet]) -> ScalarAggregator: # No magnet aggregator for simulator return None
[docs] def create_bpm_aggregators(self, bpms: list[BPM]) -> list[ScalarAggregator]: agg = BPMScalarAggregator(self.get_lattice()) aggh = BPMHScalarAggregator(self.get_lattice()) aggv = BPMVScalarAggregator(self.get_lattice()) for b in bpms: e = self.get_at_elems(b)[0] agg.add_elem(e) aggh.add_elem(e) aggv.add_elem(e) return [agg, aggh, aggv]
[docs] def fill_device(self, elements: list[Element]): for e in elements: # Need conversion to physics unit to work with simulator if isinstance(e, Magnet): current = RWHardwareScalar(self.get_at_elems(e), e.polynom, e.model) if e.model.has_physics() else None strength = RWStrengthScalar(self.get_at_elems(e), e.polynom, e.model) if e.model.has_physics() else None # Create a unique ref for this simulator m = e.attach(self, strength, current) self.add_magnet(m) elif isinstance(e, CombinedFunctionMagnet): currents = RWHardwareArray(self.get_at_elems(e), e.polynoms, e.model) if e.model.has_physics() else None strengths = RWStrengthArray(self.get_at_elems(e), e.polynoms, e.model) if e.model.has_physics() else None # Create unique refs of each function for this simulator ms = e.attach(self, strengths, currents) self.add_cfm_magnet(ms[0]) for m in ms[1:]: self.add_magnet(m) elif isinstance(e, SerializedMagnets): currents = [] strengths = [] # Create unique refs the series and each of its function for this # control system # Link hardware to strengths and bind strength together for index, magnet in enumerate(e.get_magnets()): current = ( RWHardwareScalar( self.get_at_elems(magnet), e.polynom, e.model.get_sub_model(index), ) if e.model.has_hardware() else None ) strength = ( RWStrengthScalar( self.get_at_elems(magnet), e.polynom, e.model.get_sub_model(index), ) if e.model.has_physics() else None ) currents.append(current) strengths.append(strength) linked_currents = [] linked_strengths = [] for i in range(e.get_nb_magnets()): current = RWSerializedHardware(currents, i) if e.model.has_hardware() else None strength = RWSerializedStrength(strengths, currents, i) if e.model.has_physics() else None linked_currents.append(current) linked_strengths.append(strength) ms = e.attach(self, linked_strengths, linked_currents) self.add_serialized_magnet(ms[0]) for m in ms[1:]: self.add_magnet(m) elif isinstance(e, BPM): # This assumes unique BPM names in the pyAT lattice tilt = RWBpmTiltScalar(self.get_at_elems(e)[0]) offsets = RWBpmOffsetArray(self.get_at_elems(e)[0]) positions = RBpmArray(self.get_at_elems(e)[0], self.ring) e = e.attach(self, positions, offsets, tilt) self.add_bpm(e) elif isinstance(e, RFPlant): if e._cfg.transmitters: cavs: list[at.Element] = [] harmonics: list[float] = [] attachedTrans: list[RFTransmitter] = [] for t in e._cfg.transmitters: cavsPerTrans: list[at.Element] = [] for c in t._cfg.cavities: # Expect unique name for cavities cav = self.get_at_elems(Element(c)) if len(cav) > 1: raise PyAMLException(f"RF transmitter {t.get_name()},multiple cavity definition:{{cav[0]}}") if len(cav) == 0: raise PyAMLException(f"RF transmitter {t.get_name()}, No cavity found") cavsPerTrans.append(cav[0]) harmonics.append(t._cfg.harmonic) voltage = RWRFVoltageScalar(cavsPerTrans) phase = RWRFPhaseScalar(cavsPerTrans) nt = t.attach(self, voltage, phase) self.add_rf_transnmitter(nt) cavs.extend(cavsPerTrans) attachedTrans.append(nt) frequency = RWRFFrequencyScalar(cavs, harmonics) voltage = RWTotalVoltage(attachedTrans) ne = e.attach(self, frequency, voltage) self.add_rf_plant(ne) else: # No transmitter defined switch to AT methods frequency = RWRFATFrequencyScalar(self.ring) voltage = RWRFATotalVoltageScalar(self.ring) ne = e.attach(self, frequency, voltage) self.add_rf_plant(ne) elif isinstance(e, BetatronTuneMonitor): betatron_tune = RBetatronTuneArray(self.ring) e = e.attach(self, betatron_tune) self.add_betatron_tune_monitor(e) elif isinstance(e, MeasurementTool) | isinstance(e, TuningTool): self.add_tool(e.attach(self))
[docs] def get_names(self, element: Element) -> list[str] | None: """ Parse element lattice_name syntax. see Element.ConfigModel.lattice_name. """ pattern = element.get_lattice_names() if pattern is None: return None if pattern.startswith("list("): try: return pattern[5:-1].rsplit(",") except Exception as err: strErr = f"{element.get_name()}: Invalid lattice_names syntax " strErr += f"for {pattern}, {str(err)}" raise PyAMLException(strErr) from err return None
[docs] def get_indices(self, element: Element) -> (str | None, list[int] | None): """ Parse element lattice_name syntax. see Element.ConfigModel.lattice_name. """ pattern = element.get_lattice_names() if pattern is None: return (element.get_name(), None) # [name]@idx[,idx] syntax split = pattern.rfind("@") if split >= 0: try: name = pattern[:split] l = pattern[split + 1 :] lidx = l.rsplit(",") rlist = list(map(int, lidx)) return (name if len(name) > 0 else None, rlist) except Exception as err: strErr = f"{element.get_name()}: Invalid lattice_names syntax " strErr += f"for {pattern}, {str(err)}" raise PyAMLException(strErr) from err # [name]#start_idx..end_idx syntax split = pattern.rfind("#") if split >= 0: try: name = pattern[:split] l = pattern[split + 1 :] lrange = l.rsplit("..") sl = list(map(int, lrange)) rlist = range(sl[0], sl[1]) return (name if len(name) > 0 else None, rlist) except Exception as err: strErr = f"{element.get_name()}: Invalid lattice_names syntax " strErr += f"for {pattern}, {str(err)}" raise PyAMLException(strErr) from err return (element.get_name(), None)
[docs] def get_at_elems(self, element: Element) -> list[at.Element]: if self._linker: identifier = self._linker.get_element_identifier(element) element_list = self._linker.get_at_elements(identifier) if not element_list: raise PyAMLException(f"{identifier} not found in lattice:{self._cfg.lattice}") return element_list else: # By list nameList = self.get_names(element) if nameList is not None: names = [] for name in nameList: if name not in self._elements_indexing: raise PyAMLException(f"{name} not found in lattice:{self._cfg.lattice}") elts = self._elements_indexing[name] names.extend(elts) return names # By name or indices name, indices = self.get_indices(element) if name is None: # Direct indexing in the ring return [self.ring[idx] for idx in indices] else: if name not in self._elements_indexing: raise PyAMLException(f"{name} not found in lattice:{self._cfg.lattice}") elts = self._elements_indexing[name] if indices is None: return elts else: return [elts[idx] for idx in indices]
def __repr__(self): return repr(self._cfg).replace("ConfigModel", self.__class__.__name__)