Source code for pyaml.yellow_pages

"""
yellow_pages.py

Fully dynamic YellowPages service attached to :class:`~pyaml.accelerator.Accelerator`.

Key points:
    - Auto-discovery only: arrays, tools and diagnostics are discovered at runtime
      by scanning all modes.
    - No caching: every call reflects current runtime state.
    - Simple query syntax for identifiers:
        - wildcard / fnmatch:
            yp["OH4*"]
        - regular expression:
            yp["re:^SH1A-C0[12]-H$"]

:class:`~pyaml.accelerator.Accelerator` interface
------------------------------
    - controls()   -> dict[str, ElementHolder]
    - simulators() -> dict[str, ElementHolder]
    - modes()      -> dict[str, ElementHolder]
"""

import fnmatch
import re
from enum import Enum
from typing import TYPE_CHECKING, Any

from pyaml import PyAMLException

if TYPE_CHECKING:
    from .accelerator import Accelerator


[docs] class YellowPagesCategory(str, Enum): ARRAYS = "Arrays" TOOLS = "Tools" DIAGNOSTICS = "Diagnostics"
[docs] class YellowPagesError(PyAMLException): """YellowPages-specific error with clear user-facing messages."""
[docs] class YellowPagesQueryError(YellowPagesError): """Raised when a YellowPages query string cannot be evaluated."""
_VALID_KEY_RE = re.compile(r"^[A-Z0-9_]+$")
[docs] class YellowPages: r""" Dynamic discovery service for accelerator objects. :class:`YellowPages` provides a unified access layer to arrays, tuning tools and diagnostics available in an :class:`~pyaml.accelerator.Accelerator`. Entries are discovered dynamically by scanning all :class:`~pyaml.element_holder.ElementHolder` instances associated with the :class:`~pyaml.accelerator.Accelerator` control and simulation modes. Notes ----- The :class:`~pyaml.accelerator.Accelerator` must provide: - ``controls() -> dict[str, ElementHolder]`` - ``simulators() -> dict[str, ElementHolder]`` - ``modes() -> dict[str, ElementHolder]`` Examples -------- Print the global overview: .. code-block:: python >>> print(sr.yellow_pages) Resolve an entry across all modes: .. code-block:: python >>> sr.yellow_pages.get("BPM") Resolve in a specific mode: .. code-block:: python >>> sr.yellow_pages.get("BPM", mode="live") Search identifiers using wildcards: .. code-block:: python >>> sr.yellow_pages["OH4*"] Search identifiers using a regular expression: .. code-block:: python >>> sr.yellow_pages["re:^SH1A-C0[12]-H$"] """ def __init__(self, accelerator: "Accelerator"): self._acc = accelerator
[docs] def has(self, key: str) -> bool: r""" Check whether a YellowPages key exists. Parameters ---------- key : str Entry name. Returns ------- bool ``True`` if the key exists. Examples -------- .. code-block:: python >>> sr.yellow_pages.has("BPM") True .. code-block:: python >>> sr.yellow_pages.has("UNKNOWN") False """ return key in self._all_keys()
[docs] def categories(self) -> list[str]: r""" Return the list of available categories. Only categories that contain entries are returned. Returns ------- list[str] Examples -------- .. code-block:: python >>> sr.yellow_pages.categories() ['Arrays', 'Tools', 'Diagnostics'] """ discovered = self._discover() present = {cat for cat, keys in discovered.items() if keys} return [c.value for c in YellowPagesCategory if c in present]
[docs] def keys(self, category: str | YellowPagesCategory | None = None) -> list[str]: r""" Return available YellowPages keys. Parameters ---------- category : str or YellowPagesCategory, optional Restrict the result to a specific category. Returns ------- list[str] Examples -------- All entries: .. code-block:: python >>> sr.yellow_pages.keys() Only arrays: .. code-block:: python >>> sr.yellow_pages.keys("Arrays") Using enum: .. code-block:: python >>> sr.yellow_pages.keys(YellowPagesCategory.ARRAYS) """ discovered = self._discover() if category is None: return self._all_keys() cat = YellowPagesCategory(category) return discovered.get(cat, [])
[docs] def __dir__(self): """ Extend ``dir()`` with attribute-friendly discovered keys. """ default = super().__dir__() return sorted(set(default) | {k for k in self._all_keys() if _VALID_KEY_RE.match(k)})
[docs] def __getattr__(self, name): """ Allow attribute-style access for valid discovered keys. Examples -------- .. code-block:: python >>> sr.yellow_pages.BPM """ if name in self._all_keys(): return self._get_object(name) raise AttributeError(f"'YellowPages' object has no attribute '{name}'")
[docs] def availability(self, key: str) -> set[str]: r""" Return the set of modes where a key is available. Parameters ---------- key : str Entry name. Returns ------- set[str] Examples -------- .. code-block:: python >>> sr.yellow_pages.availability("BPM") >>> sr.yellow_pages.availability("DEFAULT_ORBIT_CORRECTION") """ self._require_key(key) avail: set[str] = set() for mode_name, holder in self._acc.modes().items(): if self._try_resolve_in_holder(key, holder) is not None: avail.add(mode_name) return avail
def _get_object(self, key: str, *, mode: str | None = None): r""" Resolve a YellowPages entry. Parameters ---------- key : str Entry name. mode : str, optional Restrict the resolution to a specific mode. Returns ------- object or dict[str, object] If ``mode`` is specified, returns the resolved object. Otherwise returns a dictionary mapping mode names to resolved objects. Raises ------ KeyError If the key does not exist. YellowPagesError If the mode is unknown or the key is not available in the requested mode. Examples -------- Resolve across all modes: .. code-block:: python >>> sr.yellow_pages.get("BPM") Resolve in a specific mode: .. code-block:: python >>> sr.yellow_pages.get("BPM", mode="live") Using attribute access: .. code-block:: python >>> sr.yellow_pages.BPM """ self._require_key(key) if mode is not None: holder = self._acc.modes().get(mode) if holder is None: raise YellowPagesError(f"Unknown mode '{mode}'.") obj = self._try_resolve_in_holder(key, holder) if obj is None: raise YellowPagesError(f"YellowPages key '{key}' not available in mode '{mode}'.") return obj out: dict[str, Any] = {} for mode_name, holder in self._acc.modes().items(): obj = self._try_resolve_in_holder(key, holder) if obj is not None: out[mode_name] = obj return out
[docs] def __getitem__(self, query: str) -> list[str]: """ Alias for :meth:`get`. """ return self.get(query)
[docs] def get(self, query: str, mode: str | None = None) -> list[str]: """ Search identifiers using a wildcard or regular expression. Parameters ---------- query : str Search expression. mode : str, optional Restrict the search to a specific :class:`~pyaml.accelerator.Accelerator` mode. Returns ------- list[str] Examples -------- .. code-block:: python >>> sr.yellow_pages.get("OH4*") >>> sr.yellow_pages.get("OH4*", mode="live") >>> sr.yellow_pages.get("re:^SH1A-C0[12]-H$") """ if not query or not query.strip(): raise YellowPagesQueryError("Empty YellowPages query.") query = query.strip() if mode is not None: holder = self._acc.modes().get(mode) if holder is None: raise YellowPagesError(f"Unknown mode '{mode}'.") if query in holder._list_arrays(): return self._object_to_ids(holder._get_array(query)) else: ids = self._ids_from_holder(holder) else: if query in self.keys(YellowPagesCategory.ARRAYS): if mode is None: arr_list: list[str] = [] for a_mode in self._acc.modes(): try: obj = self._get_object(query, mode=a_mode) array_ids = self._object_to_ids(obj) self._extend_unique(arr_list, array_ids) except Exception: continue return arr_list return self._object_to_ids(self._get_object(query, mode=mode)) ids = self._all_known_ids() if query.startswith("re:"): pattern = query[3:] try: rx = re.compile(pattern) except re.error as ex: raise YellowPagesQueryError(f"Invalid regex '{pattern}': {ex}") from ex return [i for i in ids if rx.search(i)] return [i for i in ids if fnmatch.fnmatch(i, query)]
[docs] def __repr__(self) -> str: r""" Return a human-readable overview of the YellowPages content. The representation lists: - controls - simulators - discovered arrays, tools and diagnostics The displayed type corresponds to the Python module defining the resolved object. Examples -------- .. code-block:: python >>> print(sr.yellow_pages) Example output: .. code-block:: text Controls: live . Simulators: design . Arrays: BPM (pyaml.arrays.bpm_array) size=224 HCORR (pyaml.arrays.magnet_array) size=96 . Tools: DEFAULT_ORBIT_CORRECTION (pyaml.tuning_tools.orbit) . Diagnostics: BETATRON_TUNE (pyaml.diagnostics.tune_monitor) . """ lines: list[str] = [] lines.append("Controls:") controls = self._acc.controls() if controls: for name in controls.keys(): lines.append(f" {name}") lines.append(" .") lines.append("") lines.append("Simulators:") simulators = self._acc.simulators() if simulators: for name in simulators.keys(): lines.append(f" {name}") lines.append(" .") lines.append("") discovered = self._discover() for cat in YellowPagesCategory: keys = discovered.get(cat, set()) if not keys: continue lines.append(f"{cat.value}:") for key in keys: lines.append(self._format_key(cat, key)) lines.append(" .") lines.append("") return "\n".join(lines).rstrip()
[docs] def __str__(self) -> str: return self.__repr__()
def _discover(self) -> dict[YellowPagesCategory, list[str]]: arrays: list[str] = [] tools: list[str] = [] diags: list[str] = [] for _, holder in self._acc.modes().items(): try: self._extend_unique(arrays, holder._list_arrays()) except Exception: pass try: self._extend_unique(tools, holder._list_tools()) except Exception: pass try: self._extend_unique(diags, holder._list_diagnostics()) except Exception: pass return { YellowPagesCategory.ARRAYS: arrays, YellowPagesCategory.TOOLS: tools, YellowPagesCategory.DIAGNOSTICS: diags, } def _extend_unique(self, target: list[str], values: list[str]) -> None: """ Append values to target while preserving insertion order and uniqueness. """ for value in values: if value not in target: target.append(value) def _all_keys(self) -> list[str]: discovered = self._discover() out: list[str] = [] for keys in discovered.values(): self._extend_unique(out, keys) return out def _require_key(self, key: str) -> None: if key not in self._all_keys(): raise KeyError(self._unknown_key_message(key)) def _unknown_key_message(self, key: str) -> str: available = ", ".join(self._all_keys()) return f"Unknown YellowPages key '{key}'. Available keys: {available if available else '<none>'}" def _try_resolve_in_holder(self, key: str, holder: Any) -> Any | None: """ Resolve a discovered key in a holder. Resolution order: - arrays - tools - diagnostics """ try: if key in holder._list_arrays(): return holder._get_array(key) except Exception: pass try: if key in holder._list_tools(): return holder._get_tool(key) except Exception: pass try: if key in holder._list_diagnostics(): return holder._get_diagnostic(key) except Exception: pass return None def _get_type_name_from_resolved(self, resolved: dict[str, Any]) -> str | None: """ Return the public type name used in ``__repr__``. Only the module path is displayed, not the concrete class name. Examples -------- .. code-block:: text pyaml.arrays.bpm_array pyaml.tuning_tools.orbit pyaml.diagnostics.tune_monitor """ for obj in resolved.values(): if obj is None: continue return obj.__class__.__module__ return None def _format_key(self, category: YellowPagesCategory, key: str) -> str: """ Format one discovered key for ``__repr__``. """ resolved = self._get_object(key) type_name = self._get_type_name_from_resolved(resolved) type_part = f" ({type_name})" if type_name else "" modes = list(resolved.keys()) all_modes = list(self._acc.modes().keys()) availability_part = "" if set(modes) != set(all_modes): missing = [mode for mode in all_modes if mode not in modes] availability_part = f" modes={modes} missing={missing}" if category == YellowPagesCategory.ARRAYS: sizes: dict[str, int] = {} for mode_name, obj in resolved.items(): try: sizes[mode_name] = len(obj) except Exception: sizes[mode_name] = 0 if modes == all_modes and sizes and len(set(sizes.values())) == 1: size_part = f" size={next(iter(sizes.values()))}" else: size_part = " size={" + ", ".join(f"{m}:{n}" for m, n in sizes.items()) + "}" return f" {key:<21}{type_part:<40}{size_part}{availability_part}" return f" {key}{type_part}{availability_part}" def _object_to_ids(self, obj: Any) -> list[str]: """ Convert a resolved object into a set of identifiers. """ if obj is None: return [] if isinstance(obj, (list, tuple, set)) and all(isinstance(x, str) for x in obj): return list(obj) ids: list[str] = list() try: for x in obj: if isinstance(x, str): if x not in ids: ids.append(x) elif hasattr(x, "get_name") and callable(x.get_name): if x.get_name() not in ids: ids.append(x.get_name()) elif hasattr(x, "name") and callable(x.name): if x.name() not in ids: ids.append(x.name()) elif hasattr(x, "name") and isinstance(x.name, str): if x.name not in ids: ids.append(x.name) else: if str(x) not in ids: ids.append(str(x)) return ids except TypeError: if isinstance(obj, str): return [obj] if hasattr(obj, "get_name") and callable(obj.get_name): return [obj.get_name()] return [str(obj)] def _ids_for_key_union_all_modes(self, key: str) -> list[str]: out: list[str] = [] resolved = self._get_object(key) for obj in resolved.values(): self._extend_unique(out, self._object_to_ids(obj)) return out def _all_known_ids(self) -> list[str]: """ Collect all identifiers from all discovered arrays across all modes. """ all_ids: list[str] = [] for array_name in self.keys(YellowPagesCategory.ARRAYS): try: self._extend_unique(all_ids, self._ids_for_key_union_all_modes(array_name)) except Exception: continue return all_ids def _ids_from_holder(self, holder) -> list[str]: ids: list[str] = [] try: for name in holder._list_arrays(): arr = holder._get_array(name) self._extend_unique(ids, self._object_to_ids(arr)) except Exception: pass return ids