Source code for pyaml.tuning_tools.measurement_tool
import logging
from abc import ABCMeta, abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Optional, Self
from pydantic import ConfigDict
from ..common.constants import Action
from ..common.element import Element, ElementConfigModel
from ..common.exception import PyAMLException
if TYPE_CHECKING:
from ..common.element_holder import ElementHolder
logger = logging.getLogger(__name__)
[docs]
class MeasurementToolConfigModel(ElementConfigModel):
"""
Measurement tool configuration model
Parameters
----------
n_step: int, optional
Number of measurement step [-delta/n_step..delta/n_step]
Default 1
sleep_between_step: float, optional
Default sleep time after an actuator excitation
Default: 0
n_avg_meas : int, optional
Default number of measurement per step used for averaging
Default 1
sleep_between_meas: float, optional
Default sleep time between two measurments
Default: 0
"""
model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")
n_step: Optional[int] = 1
sleep_between_step: Optional[float] = 0
n_avg_meas: Optional[int] = 1
sleep_between_meas: Optional[float] = 0
[docs]
class MeasurementTool(Element, metaclass=ABCMeta):
"""
Base class for measurement tool such as reponse matrix measurement or other scans.
"""
def __init__(self, name):
super().__init__(name)
self._latest_measurement: dict = None
"""
"""
self._peer: "ElementHolder" = None # Peer: ControlSystem or Simulator
self._callback: Callable = None
def _init_measure(self, measurement_type: str | None = None):
# Initialize measurement data
self._latest_measurement = {}
if measurement_type is not None:
self._latest_measurement["type"] = measurement_type
[docs]
@abstractmethod
def measure(self) -> bool:
"""
Launch measurement
Returns
-------
bool
True if the process has been aborted
"""
raise NotImplementedError()
@property
def latest_measurement(self) -> dict:
"""
Return last measurement data, a dictionary containing last measurement data.
See sub class of MeasurementTool to get description.
Returns
-------
dict
Return latest measurement or None
"""
return self._latest_measurement
[docs]
def get(self) -> dict:
"""
Return last measurement data, a dictionary containing last measurement data.
See sub class of MeasurementTool to get description.
Returns
-------
dict
Return latest measurement or None
"""
return self._latest_measurement
[docs]
def save(self, save_path: Path, with_type: str = "json"):
"""
Save measurement data
Parameters
----------
save_path: Path
Matrix filename
with_type: str
File type (json,yaml,npz)
"""
if with_type == "json":
import json
data = self.latest_measurement
json.dump(data, open(save_path, "w"), indent=4)
elif with_type == "yaml":
import yaml
data = self.latest_measurement
yaml.safe_dump(data, open(save_path, "w"))
elif with_type == "npz":
import numpy as np
data = self.latest_measurement
np.savez(save_path.resolve(), **data)
else:
raise PyAMLException(f"ERROR: Unknown file type to save as: {with_type}.")
[docs]
def send_callback(self, action: Action, cb_data: dict, raiseException: bool = True):
"""
Send callback from this Measurement tool to the caller.
If the callback returns False, the scan is aborted and actuators are restored to their orignal values.
Callback example:
.. code-block:: python
def callback(action: Action, data: dict):
print(f"{action}, data:{data}")
return True
# Measure a tune response matrix using the above callback
sr.design.trm.measure(callback=callback)
Parameters
----------
action: Action
See :py:class:`pyaml.common.constants.Action`
cb_data: dict
Callback data
"""
ok = True
if self._callback is not None:
# Add source and peer
cb_data["mode"] = f"{self.get_peer_name()}"
cb_data["source"] = self
ok = self._callback(action, cb_data)
if not ok and raiseException:
# Abort, same as ctrl+C
raise KeyboardInterrupt
return ok
def _register_callback(self, callback: Callable):
self._callback = callback
[docs]
def attach(self, peer: "ElementHolder") -> Self:
"""
Create a new reference to attach this measurement tool object to a simulator
or a control system.
"""
obj = self.__class__(self._cfg)
obj._peer = peer
return obj