import at
import numpy as np
from numpy.typing import NDArray
from scipy.constants import speed_of_light
from ..common import abstract
from ..common.abstract_aggregator import ScalarAggregator
from ..common.exception import PyAMLException
from ..magnet.model import MagnetModel
from ..rf.rf_plant import RFPlant
from ..rf.rf_transmitter import RFTransmitter
from .polynom_info import PolynomInfo
# TODO handle serialized magnets for magnet array
# ------------------------------------------------------------------------------
[docs]
class RWHardwareScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to a magnet of a simulator in hardware unit.
Hardware unit is converted from strength using the magnet model
"""
def __init__(self, elements: list[at.Element], poly: PolynomInfo, model: MagnetModel):
self.__model = model
self.__elements = elements
self.__poly = [e.__getattribute__(poly.attName) for e in elements]
self.__sign = poly.sign
self.__polyIdx = poly.index
self.__length: float = 0.0
for e in elements:
self.__length += e.Length
[docs]
def get_length(self) -> float:
return self.__length
[docs]
def get(self) -> float:
s = 0
for idx, e in enumerate(self.__elements):
s += self.__poly[idx][self.__polyIdx] * self.__sign * e.Length
return self.__model.compute_hardware_values([s])[0]
[docs]
def set(self, value: float):
s = self.__model.compute_strengths([value])[0]
for idx, _ in enumerate(self.__elements):
self.__poly[idx][self.__polyIdx] = s / (self.__length * self.__sign)
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
[docs]
def unit(self) -> str:
return self.__model.get_hardware_units()[0]
[docs]
def get_model(self) -> MagnetModel:
return self.__model
# ------------------------------------------------------------------------------
[docs]
class RWStrengthScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to a strength of a simulator
"""
def __init__(self, elements: list[at.Element], poly: PolynomInfo, model: MagnetModel):
self.__model = model
self.__elements = elements
self.__poly = [e.__getattribute__(poly.attName) for e in elements]
self.__sign = poly.sign
self.__polyIdx = poly.index
self.__length = 0
for e in elements:
self.__length += e.Length
[docs]
def get_element_length(self) -> float:
return self.__length
# Gets the value
[docs]
def get(self) -> float:
s = 0
for idx, e in enumerate(self.__elements):
s += self.__poly[idx][self.__polyIdx] * self.__sign * e.Length
return s
# Sets the value
[docs]
def set(self, value: float):
for idx, _ in enumerate(self.__elements):
self.__poly[idx][self.__polyIdx] = value / (self.__length * self.__sign)
# Sets the value and wait that the read value reach the setpoint
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs]
def unit(self) -> str:
return self.__model.get_strength_units()[0]
# ------------------------------------------------------------------------------
[docs]
def get_model(self) -> MagnetModel:
return self.__model
# ------------------------------------------------------------------------------
[docs]
class RWSerializedHardware(abstract.ReadWriteFloatScalar):
def __init__(self, elements: list[RWHardwareScalar], element_index: int):
self.__elements = elements
self.__element_index = element_index
self.__total_length = 0
for e in elements:
self.__total_length += e.get_length()
[docs]
def get_element_length(self) -> float:
return self.__elements[self.__element_index].get_length()
[docs]
def get_total_length(self) -> float:
return self.__total_length
# Gets the value
[docs]
def get(self) -> float:
return self.__elements[self.__element_index].get()
# Sets the value
[docs]
def set(self, value: float):
[element.set(value) for element in self.__elements]
# Sets the value and wait that the read value reach the setpoint
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs]
def unit(self) -> str:
return self.__elements[self.__element_index].unit()
[docs]
def set_magnet_rigidity(self, brho: np.double):
[element.get_model().set_magnet_rigidity(brho) for element in self.__elements]
[docs]
class RWSerializedStrength(abstract.ReadWriteFloatScalar):
def __init__(
self,
elements_strength: list[RWStrengthScalar],
elements_hardware: list[RWHardwareScalar],
element_index: int,
):
self.__element = elements_strength[element_index]
self.__elements_strength = elements_strength
self.__elements_hardware = elements_hardware
self.__element_index = element_index
self.__total_length = 0
for e in self.__elements_hardware:
self.__total_length += e.get_length()
[docs]
def get_element_length(self) -> float:
return self.__element.get_element_length()
[docs]
def get_total_length(self) -> float:
return self.__total_length
# Gets the value
[docs]
def get(self) -> float:
return self.__elements_strength[self.__element_index].get()
# Sets the value
[docs]
def set(self, value: float):
elements_values = [value * e.get_length() / self.get_total_length() for e in self.__elements_hardware]
self.__element.set(elements_values[self.__element_index])
# compute the local hardware value
hardware_value = self.__elements_hardware[self.__element_index].get()
# compute the total hardware value
total_hardware = hardware_value * self.get_total_length() / self.get_element_length()
# dispatch this value
for index, element in enumerate(self.__elements_hardware):
if index != self.__element_index:
element.set(total_hardware * element.get_length() / self.get_total_length())
# Sets the value and wait that the read value reach the setpoint
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs]
def unit(self) -> str:
return self.__element.unit()
[docs]
def set_magnet_rigidity(self, brho: np.double):
pass
[docs]
class RWHardwareArray(abstract.ReadWriteFloatArray):
"""
Class providing read write access to a magnet of a simulator in hardware units.
Hardware units are converted from strengths using the magnet model
"""
def __init__(self, elements: list[at.Element], poly: list[PolynomInfo], model: MagnetModel):
self.__elements = elements
self.__poly = []
self.__polyIdx = []
self.__sign = []
self.__model = model
for p in poly:
self.__poly.append(elements[0].__getattribute__(p.attName))
self.__polyIdx.append(p.index)
self.__sign.append(p.sign)
# Gets the value
[docs]
def get(self) -> np.array:
nbStrength = len(self.__poly)
s = np.zeros(nbStrength)
for i in range(nbStrength):
s[i] = self.__poly[i][self.__polyIdx[i]] * self.__sign[i] * self.__elements[0].Length
return self.__model.compute_hardware_values(s)
# Sets the value
[docs]
def set(self, value: np.array):
nbStrength = len(self.__poly)
s = self.__model.compute_strengths(value)
for i in range(nbStrength):
self.__poly[i][self.__polyIdx[i]] = s[i] / (self.__elements[0].Length * self.__sign[i])
# Sets the value and wait that the read value reach the setpoint
[docs]
def set_and_wait(self, value: np.array):
raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs]
def unit(self) -> list[str]:
return self.__model.get_hardware_units()
# ------------------------------------------------------------------------------
[docs]
class RWStrengthArray(abstract.ReadWriteFloatArray):
"""
Class providing read write access to a strength (array) of a simulator
"""
def __init__(self, elements: list[at.Element], poly: list[PolynomInfo], model: MagnetModel):
self.__elements = elements
self.__poly = []
self.__polyIdx = []
self.__sign = []
self.__model = model
for p in poly:
self.__poly.append(elements[0].__getattribute__(p.attName))
self.__polyIdx.append(p.index)
self.__sign.append(p.sign)
# Gets the value
[docs]
def get(self) -> np.array:
nbStrength = len(self.__poly)
s = np.zeros(nbStrength)
for i in range(nbStrength):
s[i] = self.__poly[i][self.__polyIdx[i]] * self.__sign[i] * self.__elements[0].Length
return s
# Sets the value
[docs]
def set(self, value: np.array):
nbStrength = len(self.__poly)
s = np.zeros(nbStrength)
for i in range(nbStrength):
self.__poly[i][self.__polyIdx[i]] = value[i] / (self.__elements[0].Length * self.__sign[i])
# Sets the value and wait that the read value reach the setpoint
[docs]
def set_and_wait(self, value: np.array):
raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs]
def unit(self) -> list[str]:
return self.__model.get_strength_units()
# ------------------------------------------------------------------------------
[docs]
class BPMScalarAggregator(ScalarAggregator):
"""
BPM simulator aggregator
"""
def __init__(self, ring: at.Lattice):
self.lattice = ring
self.refpts = []
[docs]
def add_elem(self, elem: at.Element):
self.refpts.append(self.lattice.index(elem))
[docs]
def set(self, value: NDArray[np.float64]):
pass
[docs]
def set_and_wait(self, value: NDArray[np.float64]):
pass
[docs]
def get(self) -> np.array:
_, orbit = at.find_orbit(self.lattice, refpts=self.refpts)
return orbit[:, [0, 2]].flatten()
[docs]
def readback(self) -> np.array:
return self.get()
[docs]
def unit(self) -> str:
return "m"
# ------------------------------------------------------------------------------
[docs]
class BPMHScalarAggregator(BPMScalarAggregator):
"""
Vertical BPM simulator aggregator
"""
[docs]
def get(self) -> np.array:
_, orbit = at.find_orbit(self.lattice, refpts=self.refpts)
return orbit[:, 0]
# ------------------------------------------------------------------------------
[docs]
class BPMVScalarAggregator(BPMScalarAggregator):
"""
Horizontal BPM simulator aggregator
"""
[docs]
def get(self) -> np.array:
_, orbit = at.find_orbit(self.lattice, refpts=self.refpts)
return orbit[:, 2]
# ------------------------------------------------------------------------------
[docs]
class RBpmArray(abstract.ReadFloatArray):
"""
Class providing read access to a BPM position (array) of a simulator.
Position in pyAT is calculated using find_orbit function, which returns the
orbit at a specified index. The position is then extracted from the orbit
array as the first two elements (x, y).
"""
def __init__(self, element: at.Element, lattice: at.Lattice):
self.__element = element
self.__lattice = lattice
# Gets the value
[docs]
def get(self) -> np.array:
index = self.__lattice.index(self.__element)
_, orbit = at.find_orbit(self.__lattice, refpts=index)
return orbit[0, [0, 2]]
# Gets the unit of the value
[docs]
def unit(self) -> str:
return "m"
# ------------------------------------------------------------------------------
[docs]
class RWBpmOffsetArray(abstract.ReadWriteFloatArray):
"""
Class providing read write access to a BPM offset (array) of a simulator.
Offset in pyAT is defined in Offset attribute as a 2-element array.
"""
def __init__(self, element: at.Element):
self.__element = element
try:
self.__offset = element.__getattribute__("Offset")
except AttributeError:
self.__offset = None
# Gets the value
[docs]
def get(self) -> np.array:
if self.__offset is None:
raise PyAMLException("Element does not have an Offset attribute.")
return self.__offset
# Sets the value
[docs]
def set(self, value: np.array):
if self.__offset is None:
raise PyAMLException("Element does not have an Offset attribute.")
if len(value) != 2:
raise PyAMLException("BPM offset must be a 2-element array.")
self.__offset = value
# Sets the value and wait that the read value reach the setpoint
[docs]
def set_and_wait(self, value: np.array):
raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs]
def unit(self) -> str:
return "m" # Assuming all offsets are in m
# ------------------------------------------------------------------------------
[docs]
class RWBpmTiltScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to a BPM tilt of a simulator. Tilt in
pyAT is defined in Rotation attribute as a first element.
"""
def __init__(self, element: at.Element):
self.__element = element
try:
self.__tilt = element.__getattribute__("Rotation")[0]
except AttributeError:
self.__tilt = None
# Gets the value
[docs]
def get(self) -> float:
if self.__tilt is None:
raise ValueError("Element does not have a Tilt attribute.")
return self.__tilt
# Sets the value
[docs]
def set(
self,
value: float,
):
self.__tilt = value
self.__element.__setattr__("Rotation", [value, None, None])
# Sets the value and wait that the read value reach the setpoint
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs]
def unit(self) -> str:
return "rad" # Assuming BPM tilts are in rad
# ------------------------------------------------------------------------------
[docs]
class RWRFVoltageScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to a cavity voltage
of a simulator for a given RF trasnmitter.
"""
def __init__(self, elements: list[at.Element]):
self.__elements = elements
[docs]
def get(self) -> float:
sum = 0
for _idx, e in enumerate(self.__elements):
sum += e.Voltage
return sum
[docs]
def set(self, value: float):
v = value / len(self.__elements)
for e in self.__elements:
e.Voltage = v
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
[docs]
def unit(self) -> str:
return "V"
# ------------------------------------------------------------------------------
[docs]
class RWRFPhaseScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to a cavity phase of
a simulator for a given RF trasnmitter.
"""
def __init__(self, elements: list[at.Element]):
self.__elements = elements
[docs]
def get(self) -> float:
# Assume that all cavities of this transmitter
# have the same Time Lag and Frequency
wavelength = speed_of_light / self.__elements[0].Frequency
return (wavelength / self.__elements[0].TimeLag) * 2.0 * np.pi
[docs]
def set(self, value: float):
wavelength = speed_of_light / self.__elements[0].Frequency
for e in self.__elements:
e.TimeLag = wavelength * value / (2.0 * np.pi)
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
[docs]
def unit(self) -> str:
return "rad"
# ------------------------------------------------------------------------------
[docs]
class RWRFFrequencyScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to RF frequency of a simulator.
"""
def __init__(self, elements: list[at.Element], harmonics: list[float]):
self.__elements = elements
self.__harm = harmonics
[docs]
def get(self) -> float:
# Serialized cavity has the same frequency
return self.__elements[0].Frequency
[docs]
def set(self, value: float):
for idx, e in enumerate(self.__elements):
e.Frequency = value * self.__harm[idx]
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
[docs]
def unit(self) -> str:
return "Hz"
# ------------------------------------------------------------------------------
[docs]
class RWRFATFrequencyScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to RF frequency of a simulator using
AT methods.
"""
def __init__(self, ring: at.Lattice):
self.__ring = ring
[docs]
def get(self) -> float:
return self.__ring.get_rf_frequency()
[docs]
def set(self, value: float):
self.__ring.set_rf_frequency(value)
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
[docs]
def unit(self) -> str:
return "Hz"
# ------------------------------------------------------------------------------
[docs]
class RWRFATotalVoltageScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to a RF voltage of a simulator using AT methods.
"""
def __init__(self, ring: at.Lattice):
self.__ring = ring
[docs]
def get(self) -> float:
return self.__ring.get_rf_voltage()
[docs]
def set(self, value: float):
self.__ring.set_rf_voltage(value)
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
[docs]
def unit(self) -> str:
return "V"
# ------------------------------------------------------------------------------
[docs]
class RBetatronTuneArray(abstract.ReadFloatArray):
"""
Class providing read-only access to the betatron tune of a ring.
"""
def __init__(self, ring: at.Lattice):
self.__ring = ring
[docs]
def get(self) -> float:
return self.__ring.get_tune()[:2]
[docs]
def unit(self) -> str:
return "1"
# ------------------------------------------------------------------------------