mirror of
https://github.com/samjage/metro-warden.git
synced 2026-06-06 04:00:42 +00:00
130 lines
4.2 KiB
Python
130 lines
4.2 KiB
Python
"""
|
|
Network Plugin — monitors network interfaces and traffic using psutil.
|
|
|
|
Publishes to:
|
|
network.interfaces — dict of {iface: {status, ip4, ip6, rx_bytes, tx_bytes, ...}}
|
|
network.stats — aggregate stats snapshot
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import socket
|
|
from typing import Any, Dict
|
|
|
|
try:
|
|
import psutil
|
|
_PSUTIL_AVAILABLE = True
|
|
except ImportError:
|
|
_PSUTIL_AVAILABLE = False
|
|
|
|
from plugins.base import BasePlugin
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Polling interval in seconds
|
|
DEFAULT_POLL_INTERVAL = 5.0
|
|
|
|
|
|
def _get_interfaces() -> Dict[str, Dict]:
|
|
"""Collect interface statistics from psutil."""
|
|
if not _PSUTIL_AVAILABLE:
|
|
return {}
|
|
|
|
stats: Dict[str, Dict] = {}
|
|
io_counters = psutil.net_io_counters(pernic=True)
|
|
if_stats = psutil.net_if_stats()
|
|
if_addrs = psutil.net_if_addrs()
|
|
|
|
for iface, io in io_counters.items():
|
|
nic_stat = if_stats.get(iface)
|
|
addrs = if_addrs.get(iface, [])
|
|
|
|
ip4 = ""
|
|
ip6 = ""
|
|
mac = ""
|
|
for addr in addrs:
|
|
if addr.family == socket.AF_INET:
|
|
ip4 = addr.address
|
|
elif addr.family == socket.AF_INET6:
|
|
ip6 = addr.address
|
|
elif addr.family == psutil.AF_LINK:
|
|
mac = addr.address
|
|
|
|
stats[iface] = {
|
|
"status": "UP" if (nic_stat and nic_stat.isup) else "DOWN",
|
|
"speed": nic_stat.speed if nic_stat else 0,
|
|
"mtu": nic_stat.mtu if nic_stat else 0,
|
|
"ip4": ip4,
|
|
"ip6": ip6,
|
|
"mac": mac,
|
|
"rx_bytes": io.bytes_recv,
|
|
"tx_bytes": io.bytes_sent,
|
|
"rx_packets": io.packets_recv,
|
|
"tx_packets": io.packets_sent,
|
|
"rx_errors": io.errin,
|
|
"tx_errors": io.errout,
|
|
"rx_drop": io.dropin,
|
|
"tx_drop": io.dropout,
|
|
}
|
|
|
|
return stats
|
|
|
|
|
|
class NetworkPlugin(BasePlugin):
|
|
"""Monitors network interfaces and publishes stats to the event bus."""
|
|
|
|
name = "network"
|
|
version = "1.0.0"
|
|
description = "Monitors network interfaces and traffic statistics via psutil"
|
|
tags = ["network", "monitoring"]
|
|
|
|
def __init__(self, bus=None, state=None, poll_interval: float = DEFAULT_POLL_INTERVAL) -> None:
|
|
super().__init__(bus=bus, state=state)
|
|
self._poll_interval = poll_interval
|
|
self._task: asyncio.Task | None = None
|
|
self._running = False
|
|
|
|
def on_load(self) -> None:
|
|
super().on_load()
|
|
if not _PSUTIL_AVAILABLE:
|
|
self._log.warning("psutil not available — network monitoring degraded")
|
|
self._running = True
|
|
# Schedule the polling loop
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
self._task = loop.create_task(self._poll_loop())
|
|
except RuntimeError:
|
|
self._log.debug("no running event loop at load time; task deferred")
|
|
|
|
def on_unload(self) -> None:
|
|
self._running = False
|
|
if self._task and not self._task.done():
|
|
self._task.cancel()
|
|
super().on_unload()
|
|
|
|
async def _poll_loop(self) -> None:
|
|
"""Periodically collect interface data and publish to bus."""
|
|
self._log.debug("network poll loop started (interval=%.1fs)", self._poll_interval)
|
|
while self._running:
|
|
try:
|
|
data = await asyncio.to_thread(_get_interfaces)
|
|
self.state_set("network.interfaces", data)
|
|
await self._bus.publish("network.interfaces", data) if self._bus else None
|
|
await self._bus.publish("network.stats", {
|
|
"interface_count": len(data),
|
|
"active_count": sum(1 for v in data.values() if v["status"] == "UP"),
|
|
}) if self._bus else None
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as exc:
|
|
self._log.error("poll error: %s", exc)
|
|
await asyncio.sleep(self._poll_interval)
|
|
self._log.debug("network poll loop stopped")
|
|
|
|
def on_event(self, topic: str, data: Any) -> None:
|
|
"""Handle requests to refresh immediately."""
|
|
if topic == "network.refresh":
|
|
self._log.debug("manual refresh requested")
|