""" Metro Warden State Store — single source of truth with reactive watchers. The StateStore holds application state in a nested dict-like structure. Keys use dot-separated paths: "network.interfaces.eth0.rx_bytes". Watchers are notified synchronously (and the bus is published to) on every set(). """ from __future__ import annotations import copy import logging from datetime import datetime, timezone from typing import Any, Callable, Dict, List, Optional, Set import uuid log = logging.getLogger(__name__) Watcher = Callable[[str, Any, Any], None] # (key, old_value, new_value) class StateStore: """ Reactive key/value state store. Keys are dot-separated paths. Watchers are called with ``(key, old_value, new_value)`` whenever a value changes. If an :class:`~core.bus.EventBus` is supplied, every state mutation also publishes a ``state.`` event to the bus so that UI widgets can react via bus subscriptions. Usage:: store = StateStore(bus=bus) store.set("network.active", True) store.watch("network.active", lambda k, old, new: print(new)) value = store.get("network.active") """ def __init__(self, bus=None) -> None: self._data: Dict[str, Any] = {} self._watchers: Dict[str, List[tuple[str, Watcher]]] = {} self._bus = bus # optional EventBus # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def get(self, key: str, default: Any = None) -> Any: """Return the value at *key*, or *default* if not set.""" return copy.deepcopy(self._data.get(key, default)) def set(self, key: str, value: Any) -> None: """ Set *key* to *value*. Fires watchers and publishes to the bus if the value changed. """ old = self._data.get(key) self._data[key] = value if old == value: return # no-op — value unchanged log.debug("state set %r: %r -> %r", key, old, value) self._notify_watchers(key, old, value) self._publish_to_bus(key, value) def delete(self, key: str) -> bool: """Remove *key* from the store. Returns True if it existed.""" if key not in self._data: return False old = self._data.pop(key) self._notify_watchers(key, old, None) self._publish_to_bus(key, None) return True def update(self, mapping: Dict[str, Any]) -> None: """Set multiple keys at once from a dict.""" for key, value in mapping.items(): self.set(key, value) def watch(self, key: str, callback: Watcher) -> str: """ Register *callback* to be called whenever *key* changes. Supports ``*`` wildcard at the end: ``"network.*"`` will fire for any key whose first segment is ``"network"``. Returns a watcher ID that can be passed to :meth:`unwatch`. """ watcher_id = str(uuid.uuid4()) if key not in self._watchers: self._watchers[key] = [] self._watchers[key].append((watcher_id, callback)) log.debug("watch registered %s on key=%r", watcher_id[:8], key) return watcher_id def unwatch(self, watcher_id: str) -> bool: """Remove a watcher by its ID. Returns True if it was found.""" for key, entries in self._watchers.items(): for entry in entries: if entry[0] == watcher_id: entries.remove(entry) return True return False def snapshot(self) -> Dict[str, Any]: """Return a deep copy of the entire state.""" return copy.deepcopy(self._data) def keys(self) -> List[str]: """Return all keys currently in the store.""" return list(self._data.keys()) def __contains__(self, key: str) -> bool: return key in self._data def __repr__(self) -> str: return f"StateStore(keys={len(self._data)}, watchers={sum(len(v) for v in self._watchers.values())})" # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _notify_watchers(self, key: str, old: Any, new: Any) -> None: """Notify all watchers whose pattern matches *key*.""" notified: Set[str] = set() # Exact match for wid, cb in self._watchers.get(key, []): if wid not in notified: notified.add(wid) try: cb(key, old, new) except Exception as exc: log.error("watcher %s error: %s", wid[:8], exc) # Wildcard match — check each registered pattern for pattern, entries in self._watchers.items(): if pattern == key: continue # already handled above if self._key_matches(key, pattern): for wid, cb in entries: if wid not in notified: notified.add(wid) try: cb(key, old, new) except Exception as exc: log.error("watcher %s error: %s", wid[:8], exc) def _publish_to_bus(self, key: str, value: Any) -> None: if self._bus is None: return topic = f"state.{key}" self._bus.publish_sync(topic, {"key": key, "value": value}) @staticmethod def _key_matches(key: str, pattern: str) -> bool: """Simple glob matching for state keys.""" import fnmatch return fnmatch.fnmatch(key, pattern)