import logging
from pathlib import Path
from typing import TYPE_CHECKING, Literal, Optional, Union
try:
from typing import Self # Python 3.11+
except ImportError:
from typing_extensions import Self # Python 3.10 and earlier
import numpy as np
from pydantic import ConfigDict
if TYPE_CHECKING:
from ..common.element_holder import ElementHolder
from pySC import ResponseMatrix as pySC_ResponseMatrix
from pySC.apps import orbit_correction
from ..arrays.magnet_array import MagnetArray
from ..common.element import Element, ElementConfigModel
from ..common.exception import PyAMLException
from ..external.pySC_interface import pySCInterface
from ..rf.rf_plant import RFPlant
from .orbit_response_matrix_data import OrbitResponseMatrixData
from .tuning_tool import TuningTool
logger = logging.getLogger(__name__)
logging.getLogger("pyaml.external.pySC").setLevel(logging.WARNING)
PYAMLCLASS = "Orbit"
[docs]
class ConfigModel(ElementConfigModel):
model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")
bpm_array_name: str
hcorr_array_name: str
vcorr_array_name: str
rf_plant_name: Optional[str] = None
singular_values: Optional[int] = None
singular_values_H: Optional[int] = None
singular_values_V: Optional[int] = None
virtual_target: float = 0
response_matrix: Union[str, OrbitResponseMatrixData]
[docs]
class Orbit(TuningTool):
def __init__(self, cfg: ConfigModel):
super().__init__(cfg.name)
self._cfg = cfg
self.bpm_array_name = cfg.bpm_array_name
self.hcorr_array_name = cfg.hcorr_array_name
self.vcorr_array_name = cfg.vcorr_array_name
self._pySC_response_matrix = None
self.virtual_target = cfg.virtual_target
if cfg.singular_values is None:
if cfg.singular_values_H is None or cfg.singular_values_V is None:
raise PyAMLException(
"Either `singular_values` or `singular_values_H` and `singular_values_V` must be provided."
)
self.singular_values_H = cfg.singular_values_H
self.singular_values_V = cfg.singular_values_V
else:
if cfg.singular_values_H is not None or cfg.singular_values_V is not None:
raise PyAMLException(
"Either `singular_values` or `singular_values_H` and `singular_values_V` must be provided, not both."
)
self.singular_values_H = cfg.singular_values
self.singular_values_V = cfg.singular_values
# If the configuration response matrix is a filename, load it
if type(cfg.response_matrix) is str:
try:
cfg.response_matrix = OrbitResponseMatrixData.load(cfg.response_matrix)
except Exception as e:
logger.warning(f"Loading {cfg.response_matrix} failed {str(e)}")
cfg.response_matrix = None
# Converts to self._pySC_response_matrix
if cfg.response_matrix:
self._set_response_matrix(cfg.response_matrix)
self._hcorr: MagnetArray = None
self._vcorr: MagnetArray = None
self._hvcorr: MagnetArray = None
self._rf_plant: RFPlant = None
[docs]
def load(self, load_path: Path):
"""
Dynamically loads a response matrix.
Parameters
----------
load_path : Path
Filename of the :class:`~.OrbitResponseMatrixData` to load
"""
self._cfg.response_matrix = OrbitResponseMatrixData.load(load_path)
self._set_response_matrix(self._cfg.response_matrix)
def _set_response_matrix(self, mat):
m = mat._cfg.model_dump()
m["input_names"] = m.pop("variable_names")
m["output_names"] = m.pop("observable_names")
m["input_planes"] = m.pop("variable_planes")
m["output_planes"] = m.pop("observable_planes")
self._cfg.response_matrix = mat
self._pySC_response_matrix = pySC_ResponseMatrix.model_validate(m)
@property
def response_matrix(self) -> OrbitResponseMatrixData | None:
"""
Return the response matrix if it has been loaded None otherwise
"""
return self._cfg.response_matrix
[docs]
def correct(
self,
plane: Optional[Literal["H", "V"]] = None,
gain: float = 1.0,
gain_H: Optional[float] = None,
gain_V: Optional[float] = None,
gain_RF: Optional[float] = None,
singular_values_H: Optional[int] = None,
singular_values_V: Optional[int] = None,
reference: Optional[np.ndarray] = None,
rf: bool = False,
virtual_target: Optional[float] = None,
):
"""
Perform orbit correction using the configured response matrix and corrector
arrays.
Parameters
----------
reference : optional
Optional reference orbit to correct towards. If not specified, corrects
to zero orbit.
gain : float, default 1.0
Global gain applied to all corrector kicks if per-plane gains are not
specified.
plane : {'H', 'V'}, optional
Plane to correct. If 'H', only horizontal correction is performed.
If 'V', only vertical correction is performed.
If None (default), both planes are corrected.
gain_H : float, optional
Gain for the horizontal plane. Overrides `gain` for H-plane if specified.
gain_V : float, optional
Gain for the vertical plane. Overrides `gain` for V-plane if specified.
gain_RF : optional
Gain for the correction with the rf frequency. If not specified,
the gain of the horizontal plane is used.
singular_values_H : int, optional
Number of singular values to use for SVD decomposition in the horizontal
plane. If not specified, uses the default or configured value.
singular_values_V : int, optional
Number of singular values to use for SVD decomposition in the vertical
plane. If not specified, uses the default or configured value.
rf : bool, default False,
If set to true, the rf_response will also be used in the response matrix
for correction of the horizontal orbit. Only takes into effect if plane is
None or if plane = 'H'.
"""
if self._pySC_response_matrix is None:
raise PyAMLException(f"{self.get_name()} does not have a response_matrix.")
interface = pySCInterface(
element_holder=self.peer,
bpm_array_name=self.bpm_array_name,
)
if singular_values_H is not None:
svH = singular_values_H
else:
svH = self.singular_values_H
if singular_values_V is not None:
svV = singular_values_V
else:
svV = self.singular_values_V
if virtual_target is None:
virtual_target = self.virtual_target
if plane is None or plane == "H":
trims_h = orbit_correction(
interface=interface,
response_matrix=self._pySC_response_matrix,
method="svd_values",
parameter=svH,
virtual=True,
apply=False,
plane="H",
reference=reference,
rf=rf,
virtual_target=virtual_target,
)
if plane is None or plane == "V":
trims_v = orbit_correction(
interface=interface,
response_matrix=self._pySC_response_matrix,
method="svd_values",
parameter=svV,
virtual=False,
apply=False,
plane="V",
reference=reference,
rf=False,
)
eff_gain_H = gain_H if gain_H is not None else gain
eff_gain_V = gain_V if gain_V is not None else gain
# take care of rf trim
rf_flag = rf and (plane is None or plane == "H")
if rf_flag:
if self._rf_plant is None:
raise PyAMLException("RF plant is not defined!")
eff_gain_RF = gain_RF if gain_RF is not None else eff_gain_H
## pySC returns with an 'rf' entry into the dictionary if rf=True
rf_trim = eff_gain_RF * trims_h["rf"]
del trims_h["rf"]
# collect all trims and apply gain
if plane is None:
for trim in trims_h:
trims_h[trim] *= eff_gain_H
for trim in trims_v:
trims_v[trim] *= eff_gain_V
trims = {**trims_h, **trims_v}
corr_array = self._hvcorr
elif plane == "H":
for trim in trims_h:
trims_h[trim] *= eff_gain_H
trims = trims_h
corr_array = self._hcorr
elif plane == "V":
for trim in trims_v:
trims_v[trim] *= eff_gain_V
trims = trims_v
corr_array = self._vcorr
corrector_names = corr_array.names()
corrector_to_index = {name: idx for idx, name in enumerate(corrector_names)}
data_to_send = corr_array.strengths.get()
for name in trims.keys():
idx = corrector_to_index.get(name, None)
if idx is None:
raise PyAMLException(
f"Corrector {name} not found in the magnet array for orbit corr. "
"Possible inconcistency between corrector arrays and "
"response matrix."
)
data_to_send[idx] += trims[name]
# send trims
corr_array.strengths.set(data_to_send)
if rf_flag:
rf_frequency = self._rf_plant.frequency.get()
self._rf_plant.frequency.set(rf_frequency + rf_trim)
return
[docs]
def set_weight(self, name: str, weight: float, plane: Optional[Literal["H", "V"]] = None) -> None:
self._pySC_response_matrix.set_weight(name, weight, plane=plane)
return
[docs]
def set_virtual_weight(self, weight: float) -> None:
self._pySC_response_matrix.virtual_weight = weight
return
[docs]
def set_rf_weight(self, weight: float) -> None:
self._pySC_response_matrix.rf_weight = weight
return
[docs]
def get_weight(self, name: str, plane: Optional[Literal["H", "V"]] = None) -> float:
names = []
planes = []
weights = []
inames = self._pySC_response_matrix.input_names
iplanes = self._pySC_response_matrix.input_planes
iweights = self._pySC_response_matrix.input_weights
for iname, iplane, iw in zip(inames, iplanes, iweights, strict=True):
if name == iname:
if plane is None or plane == iplane:
names.append(iname)
planes.append(iplane)
weights.append(iw)
onames = self._pySC_response_matrix.output_names
oplanes = self._pySC_response_matrix.output_planes
oweights = self._pySC_response_matrix.output_weights
for oname, oplane, ow in zip(onames, oplanes, oweights, strict=True):
if name == oname:
if plane is None or plane == oplane:
names.append(oname)
planes.append(oplane)
weights.append(ow)
if len(weights) == 1:
return weights[0]
else:
raise PyAMLException(f"More than one weight found, please select plane. {names=}, {planes=}, {weights=}")
[docs]
def get_virtual_weight(self) -> float:
return self._pySC_response_matrix.virtual_weight
[docs]
def get_rf_weight(self) -> float:
return self._pySC_response_matrix.rf_weight
[docs]
def post_init(self):
self._hcorr = self.peer.get_magnets(self._cfg.hcorr_array_name)
self._vcorr = self.peer.get_magnets(self._cfg.vcorr_array_name)
hvElts = []
hvElts.extend(self._hcorr)
hvElts.extend(self._vcorr)
self._hvcorr = MagnetArray("", hvElts)
if self._cfg.rf_plant_name is not None:
self._rf_plant = self.peer.get_rf_plant(self._cfg.rf_plant_name)