from abc import ABCMeta, abstractmethod
from typing import Tuple
from ..bpm.bpm import BPM
from ..bpm.bpm_model import BPMModel
from ..common.abstract import RWMapper
from ..common.abstract_aggregator import ScalarAggregator
from ..common.element import Element
from ..common.element_holder import ElementHolder
from ..common.exception import PyAMLException
from ..configuration.factory import Factory
from ..configuration.unbound_element import UnboundElement
from ..control.abstract_impl import (
CSBPMArrayMapper,
CSScalarAggregator,
CSStrengthScalarAggregator,
RBetatronTuneArray,
RBpmArray,
RWBpmOffsetArray,
RWBpmTiltScalar,
RWHardwareArray,
RWHardwareScalar,
RWRFFrequencyScalar,
RWRFPhaseScalar,
RWRFVoltageScalar,
RWStrengthArray,
RWStrengthScalar,
)
from ..diagnostics.atune_monitor import ABetatronTuneMonitor
from ..diagnostics.tune_monitor import BetatronTuneMonitor
from ..magnet.cfm_magnet import CombinedFunctionMagnet
from ..magnet.magnet import Magnet
from ..magnet.serialized_magnet import SerializedMagnets
from ..rf.rf_plant import RFPlant, RWTotalVoltage
from ..rf.rf_transmitter import RFTransmitter
from ..tuning_tools.measurement_tool import MeasurementTool
from ..tuning_tools.tuning_tool import TuningTool
from .deviceaccess import DeviceAccess
[docs]
class ControlSystem(ElementHolder, metaclass=ABCMeta):
"""
Abstract class providing access to a control system float variable
"""
def __init__(self):
ElementHolder.__init__(self)
[docs]
@abstractmethod
def attach(self, dev: list[DeviceAccess]) -> list[DeviceAccess]:
"""Return new instances of DeviceAccess objects
coming from configuration attached to this CS"""
pass
[docs]
@abstractmethod
def attach_array(self, dev: list[DeviceAccess]) -> list[DeviceAccess]:
"""Return new instances of DeviceAccess objects
coming from configuration attached to this CS"""
pass
[docs]
@abstractmethod
def name(self) -> str:
"""Return control system name (i.e. live)"""
pass
[docs]
@abstractmethod
def scalar_aggregator(self) -> str | None:
"""Returns the module name used for handling aggregator of DeviceAccess"""
return None
[docs]
@abstractmethod
def vector_aggregator(self) -> str | None:
"""Returns the module name used for handling aggregator of DeviceVectorAccess"""
return None
[docs]
def attach_indexed(self, dev: DeviceAccess, idx: int | None) -> DeviceAccess:
if idx is not None:
return self.attach_array([dev])[0]
else:
return self.attach([dev])[0]
[docs]
def create_scalar_aggregator(self) -> ScalarAggregator:
mod = self.scalar_aggregator()
agg = Factory.build_object({"type": mod}) if mod is not None else None
return CSScalarAggregator(agg)
[docs]
def create_magnet_strength_aggregator(self, magnets: list[Magnet]) -> ScalarAggregator:
agg = CSStrengthScalarAggregator(self.create_scalar_aggregator())
for m in magnets:
devs = self.attach(m.model.get_devices())
agg.add_magnet(m, devs)
return agg
[docs]
def create_magnet_hardware_aggregator(self, magnets: list[Magnet]) -> ScalarAggregator:
"""When working in hardware space, 1 single power
supply device per multipolar strength is required
"""
agg = self.create_scalar_aggregator()
for m in magnets:
if not m.model.has_hardware():
return None
psIndex = m.hardware.index() if isinstance(m.hardware, RWMapper) else 0
agg.add_devices(self.attach([m.model.get_devices()[psIndex]])[0])
return agg
[docs]
def create_bpm_aggregators(self, bpms: list[BPM]) -> list[ScalarAggregator]:
# return [None,None,None]
if any([not b.model.is_pos_indexed() for b in bpms]):
# Aggregator for single BPM (all values are scalar)
agg = self.create_scalar_aggregator()
aggh = self.create_scalar_aggregator()
aggv = self.create_scalar_aggregator()
for b in bpms:
devs = self.attach(b.model.get_pos_devices())
agg.add_devices(devs)
aggh.add_devices(devs[0])
aggv.add_devices(devs[1])
return [agg, aggh, aggv]
elif any([b.model.is_pos_indexed() for b in bpms]):
# Aggregator for indexed BPMs
allH = []
hIdx = []
allV = []
vIdx = []
allHV = []
for b in bpms:
devs = self.attach_array(b.model.get_pos_devices())
devH = devs[0]
devV = devs[1]
if devH not in allH:
allH.append(devH)
if devH not in allHV:
allHV.append(devH)
if devV not in allV:
allV.append(devV)
if devV not in allHV:
allHV.append(devV)
hIdx.append(b.model.x_pos_index())
vIdx.append(b.model.y_pos_index())
if len(allH) > 1 or len(allV) > 1:
# Does not support aggregator for individual BPM that
# returns an array of [x,y]
print("Warning, Individual BPM that returns [x,y]" + " are not read in parralell")
# Default to serialized readding
return [None, None, None]
if devH == devV:
# [x0,y0,x1,y0,....]
idx = []
for b in bpms:
idx.append(b.model.x_pos_index())
idx.append(b.model.y_pos_index())
hvIdx = [idx]
else:
hvIdx = [hIdx, vIdx]
agg = CSBPMArrayMapper(allHV, hvIdx)
aggh = CSBPMArrayMapper(allH, [hIdx])
aggv = CSBPMArrayMapper(allV, [vIdx])
return [agg, aggh, aggv]
else:
raise PyAMLException("Indexed BPM and scalar values cannot be mixed in the same array")
[docs]
def fill_device(self, elements: list[Element]):
"""
Fill device of this control system with Element
coming from the configuration file
Parameters
----------
elements : list[Element]
List of elements coming from the configuration
file to attach to this control system
"""
for e in elements:
if isinstance(e, Magnet):
dev = self.attach(e.model.get_devices())[0]
current = RWHardwareScalar(e.model, dev) if e.model.has_hardware() else None
strength = RWStrengthScalar(e.model, dev) if e.model.has_physics() else None
# Create a unique ref for this control system
m = e.attach(self, strength, current)
self.add_magnet(m)
elif isinstance(e, CombinedFunctionMagnet):
devs = self.attach(e.model.get_devices())
currents = RWHardwareArray(e.model, devs)
strengths = RWStrengthArray(e.model, devs)
# Create unique refs the cfm and
# each of its function for this control system
ms = e.attach(self, strengths, currents)
self.add_cfm_magnet(ms[0])
for m in ms[1:]:
self.add_magnet(m)
elif isinstance(e, SerializedMagnets):
devs = self.attach(e.model.get_devices())
currents = []
strengths = []
# Create unique refs the series and each of its function for this
# control system
for i in range(e.get_nb_magnets()):
current = RWHardwareScalar(e.model.get_sub_model(i), devs[i]) if e.model.has_hardware() else None
strength = RWStrengthScalar(e.model.get_sub_model(i), devs[i]) if e.model.has_physics() else None
currents.append(current)
strengths.append(strength)
ms = e.attach(self, strengths, currents)
self.add_serialized_magnet(ms[0])
for m in ms[1:]:
self.add_magnet(m)
elif isinstance(e, BPM):
hDev = e.model.get_pos_devices()[0]
vDev = e.model.get_pos_devices()[1]
tiltDev = e.model.get_tilt_device()
hOffsetDev = e.model.get_offset_devices()[0]
vOffsetDev = e.model.get_offset_devices()[1]
ahDev = self.attach_indexed(hDev, e.model.x_pos_index())
avDev = self.attach_indexed(vDev, e.model.y_pos_index())
atiltDev = self.attach_indexed(tiltDev, e.model.tilt_index())
ahOffsetDev = self.attach_indexed(hOffsetDev, e.model.x_offset_index())
avOffsetDev = self.attach_indexed(vOffsetDev, e.model.y_offset_index())
positions = RBpmArray(e.model, ahDev, avDev)
tilt = RWBpmTiltScalar(e.model, atiltDev)
offsets = RWBpmOffsetArray(e.model, ahOffsetDev, avOffsetDev)
e = e.attach(self, positions, offsets, tilt)
self.add_bpm(e)
elif isinstance(e, RFPlant):
attachedTrans: list[RFTransmitter] = []
if e._cfg.transmitters:
for t in e._cfg.transmitters:
vDev = self.attach([t._cfg.voltage])[0]
pDev = self.attach([t._cfg.phase])[0]
voltage = RWRFVoltageScalar(t, vDev)
phase = RWRFPhaseScalar(t, pDev)
nt = t.attach(self, voltage, phase)
self.add_rf_transnmitter(nt)
attachedTrans.append(nt)
fDev = self.attach([e._cfg.masterclock])[0]
frequency = RWRFFrequencyScalar(e, fDev)
voltage = RWTotalVoltage(attachedTrans) if e._cfg.transmitters else None
ne = e.attach(self, frequency, voltage)
self.add_rf_plant(ne)
elif isinstance(e, BetatronTuneMonitor):
# Built in tune monitor
tuneDevs = self.attach([e._cfg.tune_h, e._cfg.tune_v])
betatron_tune = RBetatronTuneArray(e, tuneDevs)
e = e.attach(self, betatron_tune)
self.add_betatron_tune_monitor(e)
elif isinstance(e, TuningTool) | isinstance(e, MeasurementTool):
self.add_tool(e.attach(self))
elif isinstance(e, UnboundElement):
if self.name() in e._control_modes:
ne = Factory.build_unbound(e, self)
if isinstance(ne, ABetatronTuneMonitor):
self.add_betatron_tune_monitor(ne)
else:
# Default to standard Element
self.add_element(ne)
[docs]
class ControlSystemAdapter(ControlSystem):
"""
Control system adapter class
"""
def __init__(self):
ControlSystem.__init__(self)
[docs]
def attach(self, dev: list[DeviceAccess]) -> list[DeviceAccess]:
pass
[docs]
def attach_array(self, dev: list[DeviceAccess]) -> list[DeviceAccess]:
pass
[docs]
def name(self) -> str:
pass
[docs]
def scalar_aggregator(self) -> str | None:
return None
[docs]
def vector_aggregator(self) -> str | None:
return None