Source code for pyaml.arrays.cfm_magnet_array

import numpy as np

from ..common.abstract import ReadWriteFloatArray
from ..common.exception import PyAMLException
from ..magnet.cfm_magnet import CombinedFunctionMagnet
from .element_array import ElementArray

# TODO handle aggregator for CFM


[docs] class RWMagnetStrengths(ReadWriteFloatArray): def __init__(self, name: str, magnets: list[CombinedFunctionMagnet]): self.__name = name self.__magnets = magnets self.__nb = sum(m.nb_multipole() for m in magnets) # Gets the values
[docs] def get(self) -> np.array: r = np.zeros(self.__nb) idx = 0 for m in self.__magnets: r[idx : idx + m.nb_multipole()] = m.strengths.get() idx += m.nb_multipole() return r
# Sets the values
[docs] def set(self, value: np.array): nvalue = np.ones(self.__nb) * value if isinstance(value, float) else value idx = 0 for m in self.__magnets: m.strengths.set(nvalue[idx : idx + m.nb_multipole()]) idx += m.nb_multipole()
# Sets the values and waits that the read values reach their setpoint
[docs] def set_and_wait(self, value: np.array): raise NotImplementedError("Not implemented yet.")
# Gets the unit of the values
[docs] def unit(self) -> list[str]: r = [] for m in self.__magnets: r.extend(m.strengths.unit()) return r
[docs] class RWMagnetHardwares(ReadWriteFloatArray): def __init__(self, name: str, magnets: list[CombinedFunctionMagnet]): self.__name = name self.__magnets = magnets self.__nb = sum(m.nb_multipole() for m in magnets) # Gets the values
[docs] def get(self) -> np.array: r = np.zeros(self.__nb) idx = 0 for m in self.__magnets: r[idx : idx + m.nb_multipole()] = m.hardwares.get() idx += m.nb_multipole() return r
# Sets the values
[docs] def set(self, value: np.array): nvalue = np.ones(self.__nb) * value if isinstance(value, float) else value idx = 0 for m in self.__magnets: m.hardwares.set(nvalue[idx : idx + m.nb_multipole()]) idx += m.nb_multipole()
# Sets the values and waits that the read values reach their setpoint
[docs] def set_and_wait(self, value: np.array): raise NotImplementedError("Not implemented yet.")
# Gets the unit of the values
[docs] def unit(self) -> list[str]: r = [] for m in self.__magnets: r.extend(m.hardwares.unit()) return r
[docs] class CombinedFunctionMagnetArray(ElementArray): """ Class that implements access to a combined function magnet array Parameters ---------- arrayName : str Array name magnets : list[Magnet] Magnet list, all elements must be attached to the same instance of either a Simulator or a ControlSystem. use_aggregator : bool Use aggregator to increase performance by using paralell access to underlying devices. """ def __init__( self, arrayName: str, magnets: list[CombinedFunctionMagnet], use_aggregator=False, ): super().__init__(arrayName, magnets, use_aggregator) self.__rwstrengths = RWMagnetStrengths(arrayName, magnets) self.__rwhardwares = RWMagnetHardwares(arrayName, magnets) if use_aggregator: raise (PyAMLException("Aggregator not implemented for CombinedFunctionMagnetArray")) @property def strengths(self) -> RWMagnetStrengths: """ Give access to strength of each magnet of this array """ return self.__rwstrengths @property def hardwares(self) -> RWMagnetHardwares: """ Give access to hardware value of each magnet of this array """ return self.__rwhardwares