""" Firewall Plugin — reads firewall rules from iptables or nftables. Publishes to: firewall.backend — detected backend ("iptables", "nftables", "none") firewall.rules — parsed rule list firewall.chains — dict of chains with policy and rule count """ from __future__ import annotations import asyncio import logging import re import shutil import subprocess from typing import Any, Dict, List, Optional from plugins.base import BasePlugin log = logging.getLogger(__name__) DEFAULT_POLL_INTERVAL = 60.0 def _detect_backend() -> str: """Detect which firewall backend is available.""" if shutil.which("nft"): return "nftables" if shutil.which("iptables"): return "iptables" return "none" def _run(args: List[str]) -> str: """Run a subprocess and return stdout. Returns '' on error.""" try: result = subprocess.run( args, capture_output=True, text=True, timeout=10, ) return result.stdout except (subprocess.TimeoutExpired, FileNotFoundError, PermissionError) as exc: log.debug("command %r failed: %s", args, exc) return "" def _parse_iptables() -> Dict: """Parse iptables -L -n -v output into structured data.""" output = _run(["iptables", "-L", "-n", "-v", "--line-numbers"]) chains: Dict[str, Dict] = {} rules: List[Dict] = {} current_chain: Optional[str] = None policy_re = re.compile(r"^Chain (\S+) \(policy (\S+)") rule_re = re.compile( r"^\s*(\d+)\s+(\d+)\s+(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(.*)" ) for line in output.splitlines(): m = policy_re.match(line) if m: current_chain = m.group(1) chains[current_chain] = { "policy": m.group(2), "rule_count": 0, } continue if current_chain and (m := rule_re.match(line)): rule = { "chain": current_chain, "num": int(m.group(1)), "pkts": m.group(2), "bytes": m.group(3), "target": m.group(4), "prot": m.group(5), "in": m.group(7), "out": m.group(8), "source": m.group(9), "destination": m.group(10).strip(), } rules.append(rule) chains[current_chain]["rule_count"] += 1 return {"chains": chains, "rules": rules, "backend": "iptables"} def _parse_nftables() -> Dict: """Parse nft list ruleset output into structured data.""" output = _run(["nft", "-j", "list", "ruleset"]) chains: Dict[str, Dict] = {} rules: List[Dict] = [] try: import json data = json.loads(output) for item in data.get("nftables", []): if "chain" in item: c = item["chain"] chains[c["name"]] = { "table": c.get("table", ""), "policy": c.get("policy", ""), "rule_count": 0, } elif "rule" in item: r = item["rule"] chain_name = r.get("chain", "") rule_entry = { "chain": chain_name, "table": r.get("table", ""), "handle": r.get("handle", ""), "expr": str(r.get("expr", "")), } rules.append(rule_entry) if chain_name in chains: chains[chain_name]["rule_count"] += 1 except Exception as exc: log.debug("nftables JSON parse failed, falling back: %s", exc) # Plain-text fallback for line in output.splitlines(): line = line.strip() if line: rules.append({"chain": "unknown", "expr": line}) return {"chains": chains, "rules": rules, "backend": "nftables"} class FirewallPlugin(BasePlugin): """ Reads and monitors firewall rules from iptables or nftables. Automatically detects the available backend. """ name = "firewall" version = "1.0.0" description = "Reads firewall rules from iptables or nftables" tags = ["security", "network", "firewall"] def __init__( self, bus=None, state=None, poll_interval: float = DEFAULT_POLL_INTERVAL, ) -> None: super().__init__(bus=bus, state=state) self._poll_interval = poll_interval self._backend: str = "none" self._task: asyncio.Task | None = None self._running = False def on_load(self) -> None: super().on_load() self._backend = _detect_backend() self._log.info("firewall backend detected: %s", self._backend) self.state_set("firewall.backend", self._backend) self.subscribe("firewall.refresh") self._running = True try: loop = asyncio.get_running_loop() self._task = loop.create_task(self._poll_loop()) except RuntimeError: self._log.debug("no running event loop at load time; task deferred") def on_unload(self) -> None: self._running = False if self._task and not self._task.done(): self._task.cancel() super().on_unload() async def _poll_loop(self) -> None: self._log.debug("firewall poll loop started (interval=%.1fs)", self._poll_interval) while self._running: try: await self._collect_and_publish() except asyncio.CancelledError: break except Exception as exc: self._log.error("firewall poll error: %s", exc) await asyncio.sleep(self._poll_interval) self._log.debug("firewall poll loop stopped") async def _collect_and_publish(self) -> None: if self._backend == "iptables": data = await asyncio.to_thread(_parse_iptables) elif self._backend == "nftables": data = await asyncio.to_thread(_parse_nftables) else: data = {"chains": {}, "rules": [], "backend": "none"} self.state_set("firewall.rules", data.get("rules", [])) self.state_set("firewall.chains", data.get("chains", {})) if self._bus: await self._bus.publish("firewall.rules", data) await self._bus.publish("firewall.chains", data.get("chains", {})) def on_event(self, topic: str, data: Any) -> None: if topic == "firewall.refresh": asyncio.ensure_future(self._collect_and_publish())