Source code for pyaml.lattice.abstract_impl

import at
import numpy as np
from numpy.typing import NDArray
from scipy.constants import speed_of_light

from ..common import abstract
from ..common.abstract_aggregator import ScalarAggregator
from ..common.exception import PyAMLException
from ..magnet.model import MagnetModel
from ..rf.rf_plant import RFPlant
from ..rf.rf_transmitter import RFTransmitter
from .polynom_info import PolynomInfo

# TODO handle serialized magnets for magnet array

# ------------------------------------------------------------------------------


[docs] class RWHardwareScalar(abstract.ReadWriteFloatScalar): """ Class providing read write access to a magnet of a simulator in hardware unit. Hardware unit is converted from strength using the magnet model """ def __init__(self, elements: list[at.Element], poly: PolynomInfo, model: MagnetModel): self.__model = model self.__elements = elements self.__poly = [e.__getattribute__(poly.attName) for e in elements] self.__sign = poly.sign self.__polyIdx = poly.index self.__length: float = 0.0 for e in elements: self.__length += e.Length
[docs] def get_length(self) -> float: return self.__length
[docs] def get(self) -> float: s = 0 for idx, e in enumerate(self.__elements): s += self.__poly[idx][self.__polyIdx] * self.__sign * e.Length return self.__model.compute_hardware_values([s])[0]
[docs] def set(self, value: float): s = self.__model.compute_strengths([value])[0] for idx, _ in enumerate(self.__elements): self.__poly[idx][self.__polyIdx] = s / (self.__length * self.__sign)
[docs] def set_and_wait(self, value: float): raise NotImplementedError("Not implemented yet.")
[docs] def unit(self) -> str: return self.__model.get_hardware_units()[0]
[docs] def get_model(self) -> MagnetModel: return self.__model
# ------------------------------------------------------------------------------
[docs] class RWStrengthScalar(abstract.ReadWriteFloatScalar): """ Class providing read write access to a strength of a simulator """ def __init__(self, elements: list[at.Element], poly: PolynomInfo, model: MagnetModel): self.__model = model self.__elements = elements self.__poly = [e.__getattribute__(poly.attName) for e in elements] self.__sign = poly.sign self.__polyIdx = poly.index self.__length = 0 for e in elements: self.__length += e.Length
[docs] def get_element_length(self) -> float: return self.__length
# Gets the value
[docs] def get(self) -> float: s = 0 for idx, e in enumerate(self.__elements): s += self.__poly[idx][self.__polyIdx] * self.__sign * e.Length return s
# Sets the value
[docs] def set(self, value: float): for idx, _ in enumerate(self.__elements): self.__poly[idx][self.__polyIdx] = value / (self.__length * self.__sign)
# Sets the value and wait that the read value reach the setpoint
[docs] def set_and_wait(self, value: float): raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs] def unit(self) -> str: return self.__model.get_strength_units()[0]
# ------------------------------------------------------------------------------
[docs] def get_model(self) -> MagnetModel: return self.__model
# ------------------------------------------------------------------------------
[docs] class RWSerializedHardware(abstract.ReadWriteFloatScalar): def __init__(self, elements: list[RWHardwareScalar], element_index: int): self.__elements = elements self.__element_index = element_index self.__total_length = 0 for e in elements: self.__total_length += e.get_length()
[docs] def get_element_length(self) -> float: return self.__elements[self.__element_index].get_length()
[docs] def get_total_length(self) -> float: return self.__total_length
# Gets the value
[docs] def get(self) -> float: return self.__elements[self.__element_index].get()
# Sets the value
[docs] def set(self, value: float): [element.set(value) for element in self.__elements]
# Sets the value and wait that the read value reach the setpoint
[docs] def set_and_wait(self, value: float): raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs] def unit(self) -> str: return self.__elements[self.__element_index].unit()
[docs] def set_magnet_rigidity(self, brho: np.double): [element.get_model().set_magnet_rigidity(brho) for element in self.__elements]
[docs] class RWSerializedStrength(abstract.ReadWriteFloatScalar): def __init__( self, elements_strength: list[RWStrengthScalar], elements_hardware: list[RWHardwareScalar], element_index: int, ): self.__element = elements_strength[element_index] self.__elements_strength = elements_strength self.__elements_hardware = elements_hardware self.__element_index = element_index self.__total_length = 0 for e in self.__elements_hardware: self.__total_length += e.get_length()
[docs] def get_element_length(self) -> float: return self.__element.get_element_length()
[docs] def get_total_length(self) -> float: return self.__total_length
# Gets the value
[docs] def get(self) -> float: return self.__elements_strength[self.__element_index].get()
# Sets the value
[docs] def set(self, value: float): elements_values = [value * e.get_length() / self.get_total_length() for e in self.__elements_hardware] self.__element.set(elements_values[self.__element_index]) # compute the local hardware value hardware_value = self.__elements_hardware[self.__element_index].get() # compute the total hardware value total_hardware = hardware_value * self.get_total_length() / self.get_element_length() # dispatch this value for index, element in enumerate(self.__elements_hardware): if index != self.__element_index: element.set(total_hardware * element.get_length() / self.get_total_length())
# Sets the value and wait that the read value reach the setpoint
[docs] def set_and_wait(self, value: float): raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs] def unit(self) -> str: return self.__element.unit()
[docs] def set_magnet_rigidity(self, brho: np.double): pass
[docs] class RWHardwareArray(abstract.ReadWriteFloatArray): """ Class providing read write access to a magnet of a simulator in hardware units. Hardware units are converted from strengths using the magnet model """ def __init__(self, elements: list[at.Element], poly: list[PolynomInfo], model: MagnetModel): self.__elements = elements self.__poly = [] self.__polyIdx = [] self.__sign = [] self.__model = model for p in poly: self.__poly.append(elements[0].__getattribute__(p.attName)) self.__polyIdx.append(p.index) self.__sign.append(p.sign) # Gets the value
[docs] def get(self) -> np.array: nbStrength = len(self.__poly) s = np.zeros(nbStrength) for i in range(nbStrength): s[i] = self.__poly[i][self.__polyIdx[i]] * self.__sign[i] * self.__elements[0].Length return self.__model.compute_hardware_values(s)
# Sets the value
[docs] def set(self, value: np.array): nbStrength = len(self.__poly) s = self.__model.compute_strengths(value) for i in range(nbStrength): self.__poly[i][self.__polyIdx[i]] = s[i] / (self.__elements[0].Length * self.__sign[i])
# Sets the value and wait that the read value reach the setpoint
[docs] def set_and_wait(self, value: np.array): raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs] def unit(self) -> list[str]: return self.__model.get_hardware_units()
# ------------------------------------------------------------------------------
[docs] class RWStrengthArray(abstract.ReadWriteFloatArray): """ Class providing read write access to a strength (array) of a simulator """ def __init__(self, elements: list[at.Element], poly: list[PolynomInfo], model: MagnetModel): self.__elements = elements self.__poly = [] self.__polyIdx = [] self.__sign = [] self.__model = model for p in poly: self.__poly.append(elements[0].__getattribute__(p.attName)) self.__polyIdx.append(p.index) self.__sign.append(p.sign) # Gets the value
[docs] def get(self) -> np.array: nbStrength = len(self.__poly) s = np.zeros(nbStrength) for i in range(nbStrength): s[i] = self.__poly[i][self.__polyIdx[i]] * self.__sign[i] * self.__elements[0].Length return s
# Sets the value
[docs] def set(self, value: np.array): nbStrength = len(self.__poly) s = np.zeros(nbStrength) for i in range(nbStrength): self.__poly[i][self.__polyIdx[i]] = value[i] / (self.__elements[0].Length * self.__sign[i])
# Sets the value and wait that the read value reach the setpoint
[docs] def set_and_wait(self, value: np.array): raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs] def unit(self) -> list[str]: return self.__model.get_strength_units()
# ------------------------------------------------------------------------------
[docs] class BPMScalarAggregator(ScalarAggregator): """ BPM simulator aggregator """ def __init__(self, ring: at.Lattice): self.lattice = ring self.refpts = []
[docs] def add_elem(self, elem: at.Element): self.refpts.append(self.lattice.index(elem))
[docs] def set(self, value: NDArray[np.float64]): pass
[docs] def set_and_wait(self, value: NDArray[np.float64]): pass
[docs] def get(self) -> np.array: _, orbit = at.find_orbit(self.lattice, refpts=self.refpts) return orbit[:, [0, 2]].flatten()
[docs] def readback(self) -> np.array: return self.get()
[docs] def unit(self) -> str: return "m"
# ------------------------------------------------------------------------------
[docs] class BPMHScalarAggregator(BPMScalarAggregator): """ Vertical BPM simulator aggregator """
[docs] def get(self) -> np.array: _, orbit = at.find_orbit(self.lattice, refpts=self.refpts) return orbit[:, 0]
# ------------------------------------------------------------------------------
[docs] class BPMVScalarAggregator(BPMScalarAggregator): """ Horizontal BPM simulator aggregator """
[docs] def get(self) -> np.array: _, orbit = at.find_orbit(self.lattice, refpts=self.refpts) return orbit[:, 2]
# ------------------------------------------------------------------------------
[docs] class RBpmArray(abstract.ReadFloatArray): """ Class providing read access to a BPM position (array) of a simulator. Position in pyAT is calculated using find_orbit function, which returns the orbit at a specified index. The position is then extracted from the orbit array as the first two elements (x, y). """ def __init__(self, element: at.Element, lattice: at.Lattice): self.__element = element self.__lattice = lattice # Gets the value
[docs] def get(self) -> np.array: index = self.__lattice.index(self.__element) _, orbit = at.find_orbit(self.__lattice, refpts=index) return orbit[0, [0, 2]]
# Gets the unit of the value
[docs] def unit(self) -> str: return "m"
# ------------------------------------------------------------------------------
[docs] class RWBpmOffsetArray(abstract.ReadWriteFloatArray): """ Class providing read write access to a BPM offset (array) of a simulator. Offset in pyAT is defined in Offset attribute as a 2-element array. """ def __init__(self, element: at.Element): self.__element = element try: self.__offset = element.__getattribute__("Offset") except AttributeError: self.__offset = None # Gets the value
[docs] def get(self) -> np.array: if self.__offset is None: raise PyAMLException("Element does not have an Offset attribute.") return self.__offset
# Sets the value
[docs] def set(self, value: np.array): if self.__offset is None: raise PyAMLException("Element does not have an Offset attribute.") if len(value) != 2: raise PyAMLException("BPM offset must be a 2-element array.") self.__offset = value
# Sets the value and wait that the read value reach the setpoint
[docs] def set_and_wait(self, value: np.array): raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs] def unit(self) -> str: return "m" # Assuming all offsets are in m
# ------------------------------------------------------------------------------
[docs] class RWBpmTiltScalar(abstract.ReadWriteFloatScalar): """ Class providing read write access to a BPM tilt of a simulator. Tilt in pyAT is defined in Rotation attribute as a first element. """ def __init__(self, element: at.Element): self.__element = element try: self.__tilt = element.__getattribute__("Rotation")[0] except AttributeError: self.__tilt = None # Gets the value
[docs] def get(self) -> float: if self.__tilt is None: raise ValueError("Element does not have a Tilt attribute.") return self.__tilt
# Sets the value
[docs] def set( self, value: float, ): self.__tilt = value self.__element.__setattr__("Rotation", [value, None, None])
# Sets the value and wait that the read value reach the setpoint
[docs] def set_and_wait(self, value: float): raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs] def unit(self) -> str: return "rad" # Assuming BPM tilts are in rad
# ------------------------------------------------------------------------------
[docs] class RWRFVoltageScalar(abstract.ReadWriteFloatScalar): """ Class providing read write access to a cavity voltage of a simulator for a given RF trasnmitter. """ def __init__(self, elements: list[at.Element]): self.__elements = elements
[docs] def get(self) -> float: sum = 0 for _idx, e in enumerate(self.__elements): sum += e.Voltage return sum
[docs] def set(self, value: float): v = value / len(self.__elements) for e in self.__elements: e.Voltage = v
[docs] def set_and_wait(self, value: float): raise NotImplementedError("Not implemented yet.")
[docs] def unit(self) -> str: return "V"
# ------------------------------------------------------------------------------
[docs] class RWRFPhaseScalar(abstract.ReadWriteFloatScalar): """ Class providing read write access to a cavity phase of a simulator for a given RF trasnmitter. """ def __init__(self, elements: list[at.Element]): self.__elements = elements
[docs] def get(self) -> float: # Assume that all cavities of this transmitter # have the same Time Lag and Frequency wavelength = speed_of_light / self.__elements[0].Frequency return (wavelength / self.__elements[0].TimeLag) * 2.0 * np.pi
[docs] def set(self, value: float): wavelength = speed_of_light / self.__elements[0].Frequency for e in self.__elements: e.TimeLag = wavelength * value / (2.0 * np.pi)
[docs] def set_and_wait(self, value: float): raise NotImplementedError("Not implemented yet.")
[docs] def unit(self) -> str: return "rad"
# ------------------------------------------------------------------------------
[docs] class RWRFFrequencyScalar(abstract.ReadWriteFloatScalar): """ Class providing read write access to RF frequency of a simulator. """ def __init__(self, elements: list[at.Element], harmonics: list[float]): self.__elements = elements self.__harm = harmonics
[docs] def get(self) -> float: # Serialized cavity has the same frequency return self.__elements[0].Frequency
[docs] def set(self, value: float): for idx, e in enumerate(self.__elements): e.Frequency = value * self.__harm[idx]
[docs] def set_and_wait(self, value: float): raise NotImplementedError("Not implemented yet.")
[docs] def unit(self) -> str: return "Hz"
# ------------------------------------------------------------------------------
[docs] class RWRFATFrequencyScalar(abstract.ReadWriteFloatScalar): """ Class providing read write access to RF frequency of a simulator using AT methods. """ def __init__(self, ring: at.Lattice): self.__ring = ring
[docs] def get(self) -> float: return self.__ring.get_rf_frequency()
[docs] def set(self, value: float): self.__ring.set_rf_frequency(value)
[docs] def set_and_wait(self, value: float): raise NotImplementedError("Not implemented yet.")
[docs] def unit(self) -> str: return "Hz"
# ------------------------------------------------------------------------------
[docs] class RWRFATotalVoltageScalar(abstract.ReadWriteFloatScalar): """ Class providing read write access to a RF voltage of a simulator using AT methods. """ def __init__(self, ring: at.Lattice): self.__ring = ring
[docs] def get(self) -> float: return self.__ring.get_rf_voltage()
[docs] def set(self, value: float): self.__ring.set_rf_voltage(value)
[docs] def set_and_wait(self, value: float): raise NotImplementedError("Not implemented yet.")
[docs] def unit(self) -> str: return "V"
# ------------------------------------------------------------------------------
[docs] class RBetatronTuneArray(abstract.ReadFloatArray): """ Class providing read-only access to the betatron tune of a ring. """ def __init__(self, ring: at.Lattice): self.__ring = ring
[docs] def get(self) -> float: return self.__ring.get_tune()[:2]
[docs] def unit(self) -> str: return "1"
# ------------------------------------------------------------------------------