from typing import Any
import numpy as np
from numpy import double
from numpy.typing import NDArray
from .. import PyAMLException
from ..common import abstract
from ..common.abstract_aggregator import ScalarAggregator
from ..control.deviceaccess import DeviceAccess
from ..control.deviceaccesslist import DeviceAccessList
from ..magnet.magnet import Magnet
from ..magnet.model import MagnetModel
from ..rf.rf_plant import RFPlant
from ..rf.rf_transmitter import RFTransmitter
# ------------------------------------------------------------------------------
[docs]
def check_range(values: Any, dev_range: Any) -> bool:
"""
Check whether values are within given ranges.
Inverted semantics:
- True -> all checks pass (everything is within bounds)
- False -> at least one check fails (out of range)
dev_range format (flat):
[min1, max1, min2, max2, ...]
Broadcasting rules:
Let N = number of values, K = number of ranges (pairs).
- N == K : one range per value
- N == 1 and K > 1: the single value must satisfy ALL ranges
- N > 1 and K == 1: the single range applies to ALL values
"""
# ---- Normalize values to a 1D float array ----
v = np.asarray(values, dtype=float)
if v.ndim == 0:
v = v.reshape(1)
else:
v = v.ravel()
n = v.size
# ---- Normalize dev_range (object to preserve None) ----
r = np.asarray(dev_range, dtype=object).ravel()
if (r.size % 2) != 0:
raise ValueError(f"dev_range must have an even length, got {r.size}")
mins_obj = r[0::2]
maxs_obj = r[1::2]
k = mins_obj.size
# ---- Broadcasting rules ----
if n == k:
vv = v
mins = mins_obj
maxs = maxs_obj
elif n == 1 and k > 1:
vv = np.full(k, v[0], dtype=float)
mins = mins_obj
maxs = maxs_obj
elif n > 1 and k == 1:
vv = v
mins = np.full(n, mins_obj[0], dtype=object)
maxs = np.full(n, maxs_obj[0], dtype=object)
else:
raise ValueError(f"Inconsistent sizes: {n} value(s) for {k} range(s). Supported: N==K, N==1, or K==1.")
# ---- Replace None bounds with -inf / +inf (NumPy-safe) ----
mins_is_none = np.equal(mins, None)
maxs_is_none = np.equal(maxs, None)
mins_f = np.where(mins_is_none, -np.inf, mins).astype(float)
maxs_f = np.where(maxs_is_none, +np.inf, maxs).astype(float)
# ---- Vectorized range check ----
return bool(np.all((vv >= mins_f) & (vv <= maxs_f)))
def _as_1d_float_array(values: Any) -> np.ndarray:
"""Normalize input values to a 1D float NumPy array."""
v = np.asarray(values, dtype=float)
if v.ndim == 0:
return v.reshape(1)
return v.ravel()
def _iter_devices_and_ranges(devs: DeviceAccess | DeviceAccessList):
"""
Yield tuples (device, [min, max]) for each underlying device.
Works for:
- DeviceAccess: yields 1 item
- DeviceAccessList: yields N items based on get_devices() and get_range() flattening
"""
# Single device
if hasattr(devs, "get") and hasattr(devs, "get_range") and not hasattr(devs, "get_devices"):
r = devs.get_range()
if r is None:
r = [None, None]
return [(devs, [r[0], r[1]])]
# Device list (expects get_devices() + get_range() flat list)
devices = devs.get_devices()
flat = np.asarray(devs.get_range(), dtype=object).ravel()
if (flat.size % 2) != 0:
raise ValueError(f"dev_range must have an even length, got {flat.size}")
pairs = []
for i, d in enumerate(devices):
pairs.append((d, [flat[2 * i], flat[2 * i + 1]]))
return pairs
[docs]
class CSScalarAggregator(ScalarAggregator):
"""
Basic control system aggregator for a list of scalar values
"""
def __init__(self, devs: DeviceAccessList):
self._devs = devs
[docs]
def add_devices(self, devices: DeviceAccess | list[DeviceAccess]):
self._devs.add_devices(devices)
[docs]
def set(self, value: NDArray[np.float64]):
self._devs.set(value)
[docs]
def set_and_wait(self, value: NDArray[np.float64]):
self._devs.set_and_wait(value)
[docs]
def get(self) -> NDArray[np.float64]:
return self._devs.get()
[docs]
def readback(self) -> np.array:
return self._devs.readback()
[docs]
def unit(self) -> str:
return self._devs.unit()
[docs]
def nb_device(self) -> int:
return self._devs.__len__()
# ------------------------------------------------------------------------------
[docs]
class CSStrengthScalarAggregator(CSScalarAggregator):
"""
Control system aggregator for a list of magnet strengths.
This aggregator is in charge of computing hardware setpoints
and applying them without overlap.
When virtual magnets exported from combined function mangets are present (RWMapper),
the aggregator prevents to apply several times the same power supply setpoint.
"""
def __init__(self, peer: CSScalarAggregator):
CSScalarAggregator.__init__(self, peer._devs)
self.__models: list[MagnetModel] = [] # List of magnet model
self.__modelToMagnet: list[list[tuple[int, int]]] = [] # strengths indexing
self.__nbMagnet = 0 # Number of magnet strengths
[docs]
def add_magnet(self, magnet: Magnet, devs: list[DeviceAccess]):
# Incoming magnet can be a magnet exported from
# a CombinedFunctionMagnet or simple magnet.
# All magnets exported from a same CombinedFunctionMagnet share the same model
# TODO: check that strength is supported (m.strength may be None)
strengthIndex = magnet.strength.index() if isinstance(magnet.strength, abstract.RWMapper) else 0
if magnet.model not in self.__models:
index = len(self.__models)
self.__models.append(magnet.model)
self.__modelToMagnet.append([(self.__nbMagnet, strengthIndex)])
self._devs.add_devices(devs)
else:
index = self.__models.index(magnet.model)
self.__modelToMagnet[index].append((self.__nbMagnet, strengthIndex))
self.__nbMagnet += 1
[docs]
def set(self, value: NDArray[np.float64]):
allHardwareValues = self._devs.get() # Read all hardware setpoints
newHardwareValues = np.zeros(self.nb_device())
hardwareIndex = 0
for modelIndex, model in enumerate(self.__models):
nbDev = len(model.get_devices())
mStrengths = model.compute_strengths(allHardwareValues[hardwareIndex : hardwareIndex + nbDev])
for valueIdx, strengthIdx in self.__modelToMagnet[modelIndex]:
mStrengths[strengthIdx] = value[valueIdx]
newHardwareValues[hardwareIndex : hardwareIndex + nbDev] = model.compute_hardware_values(mStrengths)
hardwareIndex += nbDev
dev_range = self._devs.get_range()
if not check_range(newHardwareValues, dev_range):
raise PyAMLException(format_out_of_range_message(newHardwareValues, self._devs))
self._devs.set(newHardwareValues)
[docs]
def set_and_wait(self, value: NDArray[np.float64]):
raise NotImplementedError("Not implemented yet.")
[docs]
def get(self) -> NDArray[np.float64]:
allHardwareValues = self._devs.get() # Read all hardware setpoints
allStrength = np.zeros(self.__nbMagnet)
hardwareIndex = 0
for modelIndex, model in enumerate(self.__models):
nbDev = len(model.get_devices())
mStrengths = model.compute_strengths(allHardwareValues[hardwareIndex : hardwareIndex + nbDev])
for valueIdx, strengthIdx in self.__modelToMagnet[modelIndex]:
allStrength[valueIdx] = mStrengths[strengthIdx]
hardwareIndex += nbDev
return allStrength
[docs]
def readback(self) -> np.array:
allHardwareValues = self._devs.readback() # Read all hardware readback
allStrength = np.zeros(self.__nbMagnet)
hardwareIndex = 0
for modelIndex, model in enumerate(self.__models):
nbDev = len(model.get_devices())
mStrengths = model.compute_strengths(allHardwareValues[hardwareIndex : hardwareIndex + nbDev])
for valueIdx, strengthIdx in self.__modelToMagnet[modelIndex]:
allStrength[valueIdx] = mStrengths[strengthIdx]
hardwareIndex += nbDev
return allStrength
[docs]
def unit(self) -> str:
return self._devs.unit()
# ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
[docs]
class RWHardwareScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to a magnet
of a control system (in hardware units)
"""
def __init__(self, model: MagnetModel, dev: DeviceAccess):
self.__model = model
self.__dev = dev
[docs]
def get(self) -> float:
return self.__dev.get()
[docs]
def set(self, value: float):
dev_range = self.__dev.get_range()
if not check_range(value, dev_range):
raise PyAMLException(format_out_of_range_message(value, self.__dev))
self.__dev.set(value)
[docs]
def set_and_wait(self, value: double):
raise NotImplementedError("Not implemented yet.")
[docs]
def unit(self) -> str:
return self.__model.get_hardware_units()[0]
[docs]
def set_magnet_rigidity(self, brho: np.double):
self.__model.set_magnet_rigidity(brho)
# ------------------------------------------------------------------------------
[docs]
class RWStrengthScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to a strength of a control system
"""
def __init__(self, model: MagnetModel, dev: DeviceAccess):
self.__model = model
self.__dev = dev
# Gets the value
[docs]
def get(self) -> float:
current = self.__dev.get()
return self.__model.compute_strengths([current])[0]
# Sets the value
[docs]
def set(self, value: float):
current = self.__model.compute_hardware_values([value])[0]
dev_range = self.__dev.get_range()
if not check_range(current, dev_range):
raise PyAMLException(format_out_of_range_message(current, self.__dev))
self.__dev.set(current)
# Sets the value and wait that the read value reach the setpoint
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs]
def unit(self) -> str:
return self.__model.get_strength_units()[0]
[docs]
def set_magnet_rigidity(self, brho: np.double):
self.__model.set_magnet_rigidity(brho)
# ------------------------------------------------------------------------------
[docs]
class RWHardwareArray(abstract.ReadWriteFloatArray):
"""
Class providing read write access to a magnet array
of a control system (in hardware units)
"""
def __init__(self, model: MagnetModel, devs: list[DeviceAccess]):
self.__model = model
self.__devs = devs
# Gets the value
[docs]
def get(self) -> np.array:
return np.array([p.get() for p in self.__devs])
# Sets the value
[docs]
def set(self, value: np.array):
for idx, p in enumerate(self.__devs):
dev_range = p.get_range()
if not check_range(value[idx], dev_range):
raise PyAMLException(format_out_of_range_message(value[idx], p))
p.set(value[idx])
# Sets the value and waits that the read value reach the setpoint
[docs]
def set_and_wait(self, value: np.array):
raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs]
def unit(self) -> list[str]:
return self.__model.get_hardware_units()
# ------------------------------------------------------------------------------
[docs]
class RWStrengthArray(abstract.ReadWriteFloatArray):
"""
Class providing read write access to magnet strengths of a control system
"""
def __init__(self, model: MagnetModel, devs: list[DeviceAccess]):
self.__model = model
self.__devs = devs
# Gets the value
[docs]
def get(self) -> np.array:
r = np.array([p.get() for p in self.__devs])
str = self.__model.compute_strengths(r)
return str
# Sets the value
[docs]
def set(self, value: np.array):
cur = self.__model.compute_hardware_values(value)
for idx, p in enumerate(self.__devs):
dev_range = p.get_range()
if not check_range(cur[idx], dev_range):
raise PyAMLException(format_out_of_range_message(cur[idx], p))
for idx, p in enumerate(self.__devs):
p.set(cur[idx])
# Sets the value and waits that the read value reach the setpoint
[docs]
def set_and_wait(self, value: np.array):
raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs]
def unit(self) -> list[str]:
return self.__model.get_strength_units()
# ------------------------------------------------------------------------------
[docs]
class RBpmArray(abstract.ReadFloatArray):
"""
Class providing read access to a BPM position [x,y] of a control system
"""
def __init__(self, hDev: DeviceAccess, vDev: DeviceAccess):
self._hDev = hDev
self._vDev = vDev
[docs]
def get(self) -> np.array:
return np.array([self._hDev.get(), self._vDev.get()])
# Gets the unit of the value Assume that x and y, offsets and positions
# have the same unit
[docs]
def unit(self) -> str:
return self._hDev.unit()
# ------------------------------------------------------------------------------
[docs]
class RWBpmTiltScalar(abstract.ReadFloatScalar):
"""
Class providing read access to a BPM tilt of a control system
"""
def __init__(self, dev: DeviceAccess):
self._dev = dev
[docs]
def get(self) -> float:
return self._dev.get()
[docs]
def set(self, value: float):
self._dev.set(value)
[docs]
def set_and_wait(self, value: NDArray[np.float64]):
raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value
[docs]
def unit(self) -> str:
return self._dev.unit()
# ------------------------------------------------------------------------------
[docs]
class RWBpmOffsetArray(abstract.ReadWriteFloatArray):
"""
Class providing read write access to a BPM offset [x,y] of a control system
"""
def __init__(self, hDev: DeviceAccess, vDev: DeviceAccess):
self._hDev = hDev
self._vDev = vDev
[docs]
def get(self) -> np.array:
return np.array([self._hDev.get(), self._vDev.get()])
[docs]
def set(self, value: NDArray[np.float64]):
self._hDev.set(value[0])
self._vDev.set(value[1])
[docs]
def set_and_wait(self, value: NDArray[np.float64]):
raise NotImplementedError("Not implemented yet.")
# Gets the unit of the value Assume that x and y, offsets and positions
# have the same unit
[docs]
def unit(self) -> str:
return self._hDev.unit()
# ------------------------------------------------------------------------------
[docs]
class RWRFVoltageScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to cavity voltage
for a transmitter of a control system.
"""
def __init__(self, transmitter: RFTransmitter, dev: DeviceAccess):
self.__transmitter = transmitter
self.__dev = dev
[docs]
def get(self) -> float:
return self.__dev.get()
[docs]
def set(self, value: float):
self.__dev.set(value)
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
[docs]
def unit(self) -> str:
return self.__transmitter._cfg.voltage.unit()
# ------------------------------------------------------------------------------
[docs]
class RWRFPhaseScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to cavity phase
for a transmitter of a control system.
"""
def __init__(self, transmitter: RFTransmitter, dev: DeviceAccess):
self.__transmitter = transmitter
self.__dev = dev
[docs]
def get(self) -> float:
return self.__dev.get()
[docs]
def set(self, value: float):
self.__dev.set(value)
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
[docs]
def unit(self) -> str:
return self.__transmitter._cfg.phase.unit()
# ------------------------------------------------------------------------------
[docs]
class RWRFFrequencyScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to RF frequency of a control system.
"""
def __init__(self, rf: RFPlant, dev: DeviceAccess):
self.__rf = rf
self.__dev = dev
[docs]
def get(self) -> float:
# Serialized cavity has the same frequency
return self.__dev.get()
[docs]
def set(self, value: float):
self.__dev.set(value)
[docs]
def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")
[docs]
def unit(self) -> str:
return self.__rf._cfg.masterclock.unit()
# ------------------------------------------------------------------------------
[docs]
class RBetatronTuneArray(abstract.ReadFloatArray):
"""
Class providing read write access to betatron tune of a control system.
"""
def __init__(self, tune_monitor, devs: list[DeviceAccess]):
self.__tune_monitor = tune_monitor
self.__devs = devs
[docs]
def get(self) -> NDArray:
# Return horizontal and vertical betatron tunes as a NumPy array
return np.array(
[
self.__devs[0].get(),
self.__devs[1].get(),
]
)
[docs]
def unit(self) -> str:
return self.__tune_monitor._cfg.tune_v.unit()
# ------------------------------------------------------------------------------