from abc import ABCMeta, abstractmethod
from pydantic import BaseModel
from ..bpm.bpm import BPM
from ..common.abstract import RWMapper
from ..common.abstract_aggregator import ScalarAggregator
from ..common.element import Element
from ..common.element_holder import ElementHolder
from ..configuration.factory import Factory
from ..configuration.unbound_element import UnboundElement
from ..control.abstract_impl import (
CSScalarAggregator,
CSStrengthScalarAggregator,
RBetatronTuneArray,
RBpmArray,
RWBpmOffsetArray,
RWBpmTiltScalar,
RWHardwareArray,
RWHardwareScalar,
RWRFFrequencyScalar,
RWRFPhaseScalar,
RWRFVoltageScalar,
RWStrengthArray,
RWStrengthScalar,
)
from ..diagnostics.atune_monitor import ABetatronTuneMonitor
from ..diagnostics.tune_monitor import BetatronTuneMonitor
from ..magnet.cfm_magnet import CombinedFunctionMagnet
from ..magnet.magnet import Magnet
from ..magnet.serialized_magnet import SerializedMagnets
from ..rf.rf_plant import RFPlant, RWTotalVoltage
from ..rf.rf_transmitter import RFTransmitter
from ..tuning_tools.measurement_tool import MeasurementTool
from ..tuning_tools.tuning_tool import TuningTool
from .deviceaccess import DeviceAccess
[docs]
class ControlSystem(ElementHolder, metaclass=ABCMeta):
"""
Abstract class providing access to a control system float variable
"""
def __init__(self):
ElementHolder.__init__(self)
[docs]
@abstractmethod
def attach(self, dev: list[DeviceAccess | None]) -> list[DeviceAccess | None]:
"""Return new instances of DeviceAccess objects
coming from configuration attached to this CS"""
pass
[docs]
@abstractmethod
def attach_array(self, dev: list[DeviceAccess | None]) -> list[DeviceAccess | None]:
"""Return new instances of DeviceAccess objects
coming from configuration attached to this CS"""
pass
[docs]
@abstractmethod
def name(self) -> str:
"""Return control system name (i.e. live)"""
pass
[docs]
@abstractmethod
def scalar_aggregator(self) -> str | None:
"""Returns the module name used for handling aggregator of DeviceAccess"""
return None
[docs]
@abstractmethod
def vector_aggregator(self) -> str | None:
"""Returns the module name used for handling aggregator of DeviceVectorAccess"""
return None
[docs]
@abstractmethod
def get_device(self, ref: str | BaseModel | None) -> DeviceAccess:
"""
Return a device reference for this control system.
YAML element configuration passes opaque strings. Public Python APIs may
also pass backend ConfigModel instances. Concrete backends own all
lookup, parsing and DeviceAccess construction.
"""
pass
[docs]
def get_devices(self, refs: list[str | BaseModel | None]) -> list[DeviceAccess]:
"""
Return a device reference for this control system.
YAML element configuration passes opaque strings. Public Python APIs may
also pass backend ConfigModel instances. Concrete backends own all
lookup, parsing and DeviceAccess construction.
"""
return [self.get_device(ref) for ref in refs]
[docs]
def create_scalar_aggregator(self) -> ScalarAggregator:
mod = self.scalar_aggregator()
agg = Factory.build_object({"type": mod}) if mod is not None else None
return CSScalarAggregator(agg)
[docs]
def create_magnet_strength_aggregator(self, magnets: list[Magnet]) -> ScalarAggregator:
agg = CSStrengthScalarAggregator(self.create_scalar_aggregator())
for m in magnets:
devs = self.attach(m.model.get_devices())
agg.add_magnet(m, devs)
return agg
[docs]
def create_magnet_hardware_aggregator(self, magnets: list[Magnet]) -> ScalarAggregator:
"""When working in hardware space, 1 single power
supply device per multipolar strength is required
"""
agg = self.create_scalar_aggregator()
for m in magnets:
if not m.model.has_hardware():
return None
psIndex = m.hardware.index() if isinstance(m.hardware, RWMapper) else 0
agg.add_devices(self.attach([m.model.get_devices()[psIndex]])[0])
return agg
[docs]
def create_bpm_aggregators(self, bpms: list[BPM]) -> list[ScalarAggregator]:
agg = self.create_scalar_aggregator()
aggh = self.create_scalar_aggregator()
aggv = self.create_scalar_aggregator()
for b in bpms:
devs = self.get_devices(b.model.get_pos_devices())
agg.add_devices(devs)
aggh.add_devices(devs[0])
aggv.add_devices(devs[1])
return [agg, aggh, aggv]
[docs]
def fill_device(self, elements: list[Element]):
"""
Fill device of this control system with Element
coming from the configuration file
Parameters
----------
elements : list[Element]
List of elements coming from the configuration
file to attach to this control system
"""
for e in elements:
if isinstance(e, Magnet):
dev = self.attach(e.model.get_devices())[0]
current = RWHardwareScalar(e.model, dev) if e.model.has_hardware() else None
strength = RWStrengthScalar(e.model, dev) if e.model.has_physics() else None
# Create a unique ref for this control system
m = e.attach(self, strength, current)
self.add_magnet(m)
elif isinstance(e, CombinedFunctionMagnet):
devs = self.attach(e.model.get_devices())
currents = RWHardwareArray(e.model, devs)
strengths = RWStrengthArray(e.model, devs)
# Create unique refs the cfm and
# each of its function for this control system
ms = e.attach(self, strengths, currents)
self.add_cfm_magnet(ms[0])
for m in ms[1:]:
self.add_magnet(m)
elif isinstance(e, SerializedMagnets):
devs = self.attach(e.model.get_devices())
currents = []
strengths = []
# Create unique refs the series and each of its function for this
# control system
for i in range(e.get_nb_magnets()):
current = RWHardwareScalar(e.model.get_sub_model(i), devs[i]) if e.model.has_hardware() else None
strength = RWStrengthScalar(e.model.get_sub_model(i), devs[i]) if e.model.has_physics() else None
currents.append(current)
strengths.append(strength)
ms = e.attach(self, strengths, currents)
self.add_serialized_magnet(ms[0])
for m in ms[1:]:
self.add_magnet(m)
elif isinstance(e, BPM):
pos_devs = self.get_devices(e.model.get_pos_devices())
tilt_devs = self.get_devices([e.model.get_tilt_device()])
offset_devs = self.get_devices(e.model.get_offset_devices())
positions = RBpmArray(pos_devs[0], pos_devs[1])
tilt = RWBpmTiltScalar(tilt_devs[0])
offsets = RWBpmOffsetArray(offset_devs[0], offset_devs[1])
e = e.attach(self, positions, offsets, tilt)
self.add_bpm(e)
elif isinstance(e, RFPlant):
attachedTrans: list[RFTransmitter] = []
if e._cfg.transmitters:
for t in e._cfg.transmitters:
vDev = self.attach([t._cfg.voltage])[0]
pDev = self.attach([t._cfg.phase])[0]
voltage = RWRFVoltageScalar(t, vDev)
phase = RWRFPhaseScalar(t, pDev)
nt = t.attach(self, voltage, phase)
self.add_rf_transnmitter(nt)
attachedTrans.append(nt)
fDev = self.attach([e._cfg.masterclock])[0]
frequency = RWRFFrequencyScalar(e, fDev)
voltage = RWTotalVoltage(attachedTrans) if e._cfg.transmitters else None
ne = e.attach(self, frequency, voltage)
self.add_rf_plant(ne)
elif isinstance(e, BetatronTuneMonitor):
# Built in tune monitor
tuneDevs = self.attach([e._cfg.tune_h, e._cfg.tune_v])
betatron_tune = RBetatronTuneArray(e, tuneDevs)
e = e.attach(self, betatron_tune)
self.add_betatron_tune_monitor(e)
elif isinstance(e, TuningTool) | isinstance(e, MeasurementTool):
self.add_tool(e.attach(self))
elif isinstance(e, UnboundElement):
if self.name() in e._control_modes:
ne = e.instantiate(self)
if isinstance(ne, ABetatronTuneMonitor):
self.add_betatron_tune_monitor(ne)
else:
# Default to standard Element
self.add_element(ne)
[docs]
class ControlSystemAdapter(ControlSystem):
"""
Control system adapter class
"""
def __init__(self):
ControlSystem.__init__(self)
[docs]
def attach(self, dev: list[DeviceAccess | None]) -> list[DeviceAccess | None]:
pass
[docs]
def attach_array(self, dev: list[DeviceAccess | None]) -> list[DeviceAccess | None]:
pass
[docs]
def name(self) -> str:
pass
[docs]
def scalar_aggregator(self) -> str | None:
return None
[docs]
def vector_aggregator(self) -> str | None:
return None
[docs]
def get_device(self, ref: str | BaseModel | None) -> DeviceAccess | None:
pass