Source code for pyaml.arrays.bpm_array
import numpy as np
from ..bpm.bpm import BPM
from ..common.abstract import ReadFloatArray
from ..control.deviceaccesslist import DeviceAccessList
from .element_array import ElementArray
[docs]
class RWBPMPosition(ReadFloatArray):
"""
Read/write access to BPM positions (horizontal and vertical).
This class provides access to both horizontal and vertical positions
of a list of BPMs. It supports both individual access and aggregated
access through a device access list for improved performance.
Parameters
----------
name : str
Name of the position accessor
bpms : list[pyaml.bpm.bpm.BPM]
List of BPM objects to access
"""
def __init__(self, name: str, bpms: list[BPM]):
self.__bpms = bpms
self.__name = name
self.__aggregator: DeviceAccessList = None
# Gets the values
[docs]
def get(self) -> np.array:
"""
Get BPM positions.
Returns
-------
np.array
Array of shape (n_bpms, 2) containing horizontal and vertical
positions for each BPM
"""
if not self.__aggregator:
return np.array([b.positions.get() for b in self.__bpms])
else:
return self.__aggregator.get().reshape(len(self.__bpms), 2)
# Gets the unit of the values
[docs]
def unit(self) -> list[str]:
"""
Get the units for BPM positions.
Returns
-------
list[str]
List of unit strings for each BPM
"""
return [b.positions.unit() for b in self.__bpms]
# Set the aggregator (Control system only)
[docs]
def set_aggregator(self, agg: DeviceAccessList):
"""
Set the device access list aggregator for improved performance.
Parameters
----------
agg : DeviceAccessList
Device access list for aggregated reads
"""
self.__aggregator = agg
[docs]
class RWBPMSinglePosition(ReadFloatArray):
"""
Read/write access to single axis BPM positions.
This class provides access to either horizontal or vertical positions
of a list of BPMs. It supports both individual access and aggregated
access through a device access list for improved performance.
Parameters
----------
name : str
Name of the position accessor
bpms : list[pyaml.bpm.bpm.BPM]
List of BPM objects to access
idx : int
Index for the position axis (0 for horizontal, 1 for vertical)
"""
def __init__(self, name: str, bpms: list[BPM], idx: int):
self.__bpms = bpms
self.__name = name
self.__idx = idx
self.__aggregator: DeviceAccessList = None
# Gets the values
[docs]
def get(self) -> np.array:
"""
Get single axis BPM positions.
Returns
-------
np.array
Array of positions for the specified axis (horizontal or vertical)
"""
if not self.__aggregator:
return np.array([b.positions.get()[self.__idx] for b in self.__bpms])
else:
return self.__aggregator.get()
# Gets the unit of the values
[docs]
def unit(self) -> list[str]:
"""
Get the units for BPM positions.
Returns
-------
list[str]
List of unit strings for each BPM
"""
return [b.positions.unit() for b in self.__bpms]
# Set the aggregator (Control system only)
[docs]
def set_aggregator(self, agg: DeviceAccessList):
"""
Set the device access list aggregator for improved performance.
Parameters
----------
agg : DeviceAccessList
Device access list for aggregated reads
"""
self.__aggregator = agg
[docs]
class BPMArray(ElementArray):
"""
Class that implements access to a BPM array.
Parameters
----------
arrayName : str
Array name
bpms : list[pyaml.bpm.bpm.BPM]
BPM list, all elements must be attached to the same instance of
either a (:py:class:`~pyaml.lattice.simulator.Simulator`
or a :py:class:`~pyaml.control.controlsystem.ControlSystem`).
use_aggregator : bool
Use aggregator to increase performance by using paralell
access to underlying devices.
Example
-------
An array can be retrieved from the configuration as in the following
example:
.. code-block:: python
>>> sr = Accelerator.load("acc.yaml") # Load the accelerator
>>> bpms = sr.design.get_bpms("BPM") # Retrieve BPM array
>>> orbit = bpms.positions.get() # Get the orbit
or can be created by code using :py:class:`pyaml.arrays.bpm.BPM`.
"""
def __init__(self, arrayName: str, bpms: list[BPM], use_aggregator=True):
super().__init__(arrayName, bpms, use_aggregator)
self.__hvpos = RWBPMPosition(arrayName, bpms)
self.__hpos = RWBPMSinglePosition(arrayName, bpms, 0)
self.__vpos = RWBPMSinglePosition(arrayName, bpms, 1)
if use_aggregator and len(bpms) > 0:
aggs = self.get_peer().create_bpm_aggregators(bpms)
self.__hvpos.set_aggregator(aggs[0])
self.__hpos.set_aggregator(aggs[1])
self.__vpos.set_aggregator(aggs[2])
@property
def positions(self) -> RWBPMPosition:
"""
Returns position of each bpm of this array
"""
return self.__hvpos
@property
def h(self) -> RWBPMSinglePosition:
"""
Returns horizontal position of each bpm of this array
"""
return self.__hpos
@property
def v(self) -> RWBPMSinglePosition:
"""
Returns vertical position of each bpm of this array
"""
return self.__vpos