Source code for pyaml.arrays.magnet_array

import numpy as np

from ..common.abstract import ReadWriteFloatArray
from ..common.abstract_aggregator import ScalarAggregator
from ..magnet.magnet import Magnet
from .element_array import ElementArray


[docs] class RWMagnetStrength(ReadWriteFloatArray): def __init__(self, name: str, magnets: list[Magnet]): self.__name = name self.__magnets = magnets self.__nb = len(self.__magnets) self.__aggregator: ScalarAggregator = None # Gets the values
[docs] def get(self) -> np.array: if not self.__aggregator: return np.array([m.strength.get() for m in self.__magnets]) else: return self.__aggregator.get()
# Sets the values
[docs] def set(self, value: np.array): nvalue = np.ones(self.__nb) * value if isinstance(value, float) else value if not self.__aggregator: for idx, m in enumerate(self.__magnets): m.strength.set(nvalue[idx]) else: self.__aggregator.set(nvalue)
# Sets the values and waits that the read values reach their setpoint
[docs] def set_and_wait(self, value: np.array): raise NotImplementedError("Not implemented yet.")
# Gets the unit of the values
[docs] def unit(self) -> list[str]: return [m.strength.unit() for m in self.__magnets]
# Set the aggregator (Control system only)
[docs] def set_aggregator(self, agg: ScalarAggregator): self.__aggregator = agg
[docs] class RWMagnetHardware(ReadWriteFloatArray): def __init__(self, name: str, magnets: list[Magnet]): self.__name = name self.__magnets = magnets self.__nb = len(self.__magnets) self.__aggregator: ScalarAggregator = None # Gets the values
[docs] def get(self) -> np.array: if not self.__aggregator: return np.array([m.hardware.get() for m in self.__magnets]) else: return self.__aggregator.get()
# Sets the values
[docs] def set(self, value: np.array): nvalue = np.ones(self.__nb) * value if isinstance(value, float) else value if not self.__aggregator: for idx, m in enumerate(self.__magnets): m.hardware.set(value[idx]) else: self.__aggregator.set(value)
# Sets the values and waits that the read values reach their setpoint
[docs] def set_and_wait(self, value: np.array): raise NotImplementedError("Not implemented yet.")
# Gets the unit of the values
[docs] def unit(self) -> list[str]: return [m.hardware.unit() for m in self.__magnets]
# Set the aggregator
[docs] def set_aggregator(self, agg: ScalarAggregator): self.__aggregator = agg
[docs] class MagnetArray(ElementArray): """ Class that implements access to a magnet array Parameters ---------- arrayName : str Array name magnets : list[Magnet] Magnet list, all elements must be attached to the same instance of either a Simulator or a ControlSystem. use_aggregator : bool Use aggregator to increase performance by using paralell access to underlying devices. Example ------- An array can be retrieved from the configuration as in the following example:: sr = Accelerator.load("acc.yaml") quads = sr.design.get_magnets("QuadForTune") """ def __init__(self, arrayName: str, magnets: list[Magnet], use_aggregator=True): super().__init__(arrayName, magnets, use_aggregator) self.__rwstrengths = RWMagnetStrength(arrayName, magnets) self.__rwhardwares = RWMagnetHardware(arrayName, magnets) if use_aggregator and len(magnets) > 0: aggs = self.get_peer().create_magnet_strength_aggregator(magnets) aggh = self.get_peer().create_magnet_hardware_aggregator(magnets) self.__rwstrengths.set_aggregator(aggs) self.__rwhardwares.set_aggregator(aggh) @property def strengths(self) -> RWMagnetStrength: """ Give access to strength of each magnet of this array """ return self.__rwstrengths @property def hardwares(self) -> RWMagnetHardware: """ Give access to hardware value of each magnet of this array """ return self.__rwhardwares