Source code for pyaml.tuning_tools.measurement_tool

import logging
from abc import ABCMeta, abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Optional, Self

from pydantic import ConfigDict

from ..common.constants import Action
from ..common.element import Element, ElementConfigModel
from ..common.exception import PyAMLException

if TYPE_CHECKING:
    from ..common.element_holder import ElementHolder

logger = logging.getLogger(__name__)


[docs] class MeasurementToolConfigModel(ElementConfigModel): """ Measurement tool configuration model Parameters ---------- n_step: int, optional Number of measurement step [-delta/n_step..delta/n_step] Default 1 sleep_between_step: float, optional Default sleep time after an actuator excitation Default: 0 n_avg_meas : int, optional Default number of measurement per step used for averaging Default 1 sleep_between_meas: float, optional Default sleep time between two measurments Default: 0 """ model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") n_step: Optional[int] = 1 sleep_between_step: Optional[float] = 0 n_avg_meas: Optional[int] = 1 sleep_between_meas: Optional[float] = 0
[docs] class MeasurementTool(Element, metaclass=ABCMeta): """ Base class for measurement tool such as reponse matrix measurement or other scans. """ def __init__(self, name): super().__init__(name) self._latest_measurement: dict = None """ """ self._peer: "ElementHolder" = None # Peer: ControlSystem or Simulator self._callback: Callable = None def _init_measure(self, measurement_type: str | None = None): # Initialize measurement data self._latest_measurement = {} if measurement_type is not None: self._latest_measurement["type"] = measurement_type
[docs] @abstractmethod def measure(self) -> bool: """ Launch measurement Returns ------- bool True if the process has been aborted """ raise NotImplementedError()
@property def latest_measurement(self) -> dict: """ Return last measurement data, a dictionary containing last measurement data. See sub class of MeasurementTool to get description. Returns ------- dict Return latest measurement or None """ return self._latest_measurement
[docs] def get(self) -> dict: """ Return last measurement data, a dictionary containing last measurement data. See sub class of MeasurementTool to get description. Returns ------- dict Return latest measurement or None """ return self._latest_measurement
[docs] def save(self, save_path: Path, with_type: str = "json"): """ Save measurement data Parameters ---------- save_path: Path Matrix filename with_type: str File type (json,yaml,npz) """ if with_type == "json": import json data = self.latest_measurement json.dump(data, open(save_path, "w"), indent=4) elif with_type == "yaml": import yaml data = self.latest_measurement yaml.safe_dump(data, open(save_path, "w")) elif with_type == "npz": import numpy as np data = self.latest_measurement np.savez(save_path.resolve(), **data) else: raise PyAMLException(f"ERROR: Unknown file type to save as: {with_type}.")
[docs] def send_callback(self, action: Action, cb_data: dict, raiseException: bool = True): """ Send callback from this Measurement tool to the caller. If the callback returns False, the scan is aborted and actuators are restored to their orignal values. Callback example: .. code-block:: python def callback(action: Action, data: dict): print(f"{action}, data:{data}") return True # Measure a tune response matrix using the above callback sr.design.trm.measure(callback=callback) Parameters ---------- action: Action See :py:class:`pyaml.common.constants.Action` cb_data: dict Callback data """ ok = True if self._callback is not None: # Add source and peer cb_data["mode"] = f"{self.get_peer_name()}" cb_data["source"] = self ok = self._callback(action, cb_data) if not ok and raiseException: # Abort, same as ctrl+C raise KeyboardInterrupt return ok
def _register_callback(self, callback: Callable): self._callback = callback
[docs] def attach(self, peer: "ElementHolder") -> Self: """ Create a new reference to attach this measurement tool object to a simulator or a control system. """ obj = self.__class__(self._cfg) obj._peer = peer return obj