mirror of
https://github.com/samjage/metro-warden.git
synced 2026-06-06 01:00:41 +00:00
165 lines
5.6 KiB
Python
165 lines
5.6 KiB
Python
"""
|
|
Metro Warden State Store — single source of truth with reactive watchers.
|
|
|
|
The StateStore holds application state in a nested dict-like structure.
|
|
Keys use dot-separated paths: "network.interfaces.eth0.rx_bytes".
|
|
Watchers are notified synchronously (and the bus is published to) on every set().
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import copy
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Callable, Dict, List, Optional, Set
|
|
import uuid
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
Watcher = Callable[[str, Any, Any], None] # (key, old_value, new_value)
|
|
|
|
|
|
class StateStore:
|
|
"""
|
|
Reactive key/value state store.
|
|
|
|
Keys are dot-separated paths. Watchers are called with
|
|
``(key, old_value, new_value)`` whenever a value changes.
|
|
|
|
If an :class:`~core.bus.EventBus` is supplied, every state mutation
|
|
also publishes a ``state.<key>`` event to the bus so that UI widgets
|
|
can react via bus subscriptions.
|
|
|
|
Usage::
|
|
|
|
store = StateStore(bus=bus)
|
|
store.set("network.active", True)
|
|
store.watch("network.active", lambda k, old, new: print(new))
|
|
value = store.get("network.active")
|
|
"""
|
|
|
|
def __init__(self, bus=None) -> None:
|
|
self._data: Dict[str, Any] = {}
|
|
self._watchers: Dict[str, List[tuple[str, Watcher]]] = {}
|
|
self._bus = bus # optional EventBus
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
def get(self, key: str, default: Any = None) -> Any:
|
|
"""Return the value at *key*, or *default* if not set."""
|
|
return copy.deepcopy(self._data.get(key, default))
|
|
|
|
def set(self, key: str, value: Any) -> None:
|
|
"""
|
|
Set *key* to *value*.
|
|
|
|
Fires watchers and publishes to the bus if the value changed.
|
|
"""
|
|
old = self._data.get(key)
|
|
self._data[key] = value
|
|
|
|
if old == value:
|
|
return # no-op — value unchanged
|
|
|
|
log.debug("state set %r: %r -> %r", key, old, value)
|
|
self._notify_watchers(key, old, value)
|
|
self._publish_to_bus(key, value)
|
|
|
|
def delete(self, key: str) -> bool:
|
|
"""Remove *key* from the store. Returns True if it existed."""
|
|
if key not in self._data:
|
|
return False
|
|
old = self._data.pop(key)
|
|
self._notify_watchers(key, old, None)
|
|
self._publish_to_bus(key, None)
|
|
return True
|
|
|
|
def update(self, mapping: Dict[str, Any]) -> None:
|
|
"""Set multiple keys at once from a dict."""
|
|
for key, value in mapping.items():
|
|
self.set(key, value)
|
|
|
|
def watch(self, key: str, callback: Watcher) -> str:
|
|
"""
|
|
Register *callback* to be called whenever *key* changes.
|
|
|
|
Supports ``*`` wildcard at the end: ``"network.*"`` will fire
|
|
for any key whose first segment is ``"network"``.
|
|
|
|
Returns a watcher ID that can be passed to :meth:`unwatch`.
|
|
"""
|
|
watcher_id = str(uuid.uuid4())
|
|
if key not in self._watchers:
|
|
self._watchers[key] = []
|
|
self._watchers[key].append((watcher_id, callback))
|
|
log.debug("watch registered %s on key=%r", watcher_id[:8], key)
|
|
return watcher_id
|
|
|
|
def unwatch(self, watcher_id: str) -> bool:
|
|
"""Remove a watcher by its ID. Returns True if it was found."""
|
|
for key, entries in self._watchers.items():
|
|
for entry in entries:
|
|
if entry[0] == watcher_id:
|
|
entries.remove(entry)
|
|
return True
|
|
return False
|
|
|
|
def snapshot(self) -> Dict[str, Any]:
|
|
"""Return a deep copy of the entire state."""
|
|
return copy.deepcopy(self._data)
|
|
|
|
def keys(self) -> List[str]:
|
|
"""Return all keys currently in the store."""
|
|
return list(self._data.keys())
|
|
|
|
def __contains__(self, key: str) -> bool:
|
|
return key in self._data
|
|
|
|
def __repr__(self) -> str:
|
|
return f"StateStore(keys={len(self._data)}, watchers={sum(len(v) for v in self._watchers.values())})"
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _notify_watchers(self, key: str, old: Any, new: Any) -> None:
|
|
"""Notify all watchers whose pattern matches *key*."""
|
|
notified: Set[str] = set()
|
|
|
|
# Exact match
|
|
for wid, cb in self._watchers.get(key, []):
|
|
if wid not in notified:
|
|
notified.add(wid)
|
|
try:
|
|
cb(key, old, new)
|
|
except Exception as exc:
|
|
log.error("watcher %s error: %s", wid[:8], exc)
|
|
|
|
# Wildcard match — check each registered pattern
|
|
for pattern, entries in self._watchers.items():
|
|
if pattern == key:
|
|
continue # already handled above
|
|
if self._key_matches(key, pattern):
|
|
for wid, cb in entries:
|
|
if wid not in notified:
|
|
notified.add(wid)
|
|
try:
|
|
cb(key, old, new)
|
|
except Exception as exc:
|
|
log.error("watcher %s error: %s", wid[:8], exc)
|
|
|
|
def _publish_to_bus(self, key: str, value: Any) -> None:
|
|
if self._bus is None:
|
|
return
|
|
topic = f"state.{key}"
|
|
self._bus.publish_sync(topic, {"key": key, "value": value})
|
|
|
|
@staticmethod
|
|
def _key_matches(key: str, pattern: str) -> bool:
|
|
"""Simple glob matching for state keys."""
|
|
import fnmatch
|
|
|
|
return fnmatch.fnmatch(key, pattern)
|