Files
metro-warden/ui/tabs/garrison.py
T

152 lines
5.2 KiB
Python

"""
Garrison Tab — firewall rules and chain overview.
Subscribes to firewall.rules and firewall.chains events from the
FirewallPlugin and renders them in DataTable widgets.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from textual.app import ComposeResult
from textual.binding import Binding
from textual.widget import Widget
from textual.widgets import DataTable, Label, Static, TabbedContent, TabPane
class GarrisonTab(Widget):
"""
Firewall / security tab.
Two sub-sections:
- Chains: shows chain names, policies, and rule counts
- Rules: shows individual firewall rules
"""
DEFAULT_CSS = """
GarrisonTab {
layout: vertical;
height: 1fr;
padding: 1 2;
}
GarrisonTab .tab-title {
color: $primary;
text-style: bold;
height: 1;
margin-bottom: 1;
}
GarrisonTab .backend-label {
height: 1;
color: $text-muted;
margin-bottom: 1;
}
GarrisonTab DataTable {
height: 1fr;
border: round $primary;
}
GarrisonTab .section-label {
color: $accent;
text-style: bold;
height: 1;
margin-top: 1;
margin-bottom: 1;
}
GarrisonTab .hint {
height: 1;
color: $text-muted;
margin-top: 1;
}
"""
BINDINGS = [Binding("r", "refresh_firewall", "Refresh")]
CHAIN_COLUMNS = [("Chain", 16), ("Policy", 10), ("Rules", 8)]
RULE_COLUMNS = [("Chain", 12), ("Target", 10), ("Proto", 8), ("In", 8), ("Out", 8), ("Source", 18), ("Destination", 20)]
def compose(self) -> ComposeResult:
yield Static("// GARRISON — Firewall", classes="tab-title")
yield Static("Backend: detecting…", id="garrison-backend", classes="backend-label")
yield Static("Chains", classes="section-label")
yield DataTable(id="garrison-chains", zebra_stripes=True, cursor_type="row")
yield Static("Rules", classes="section-label")
yield DataTable(id="garrison-rules", zebra_stripes=True, cursor_type="row")
yield Static("Press [bold]r[/bold] to refresh", classes="hint")
def on_mount(self) -> None:
chains_table = self.query_one("#garrison-chains", DataTable)
for col, width in self.CHAIN_COLUMNS:
chains_table.add_column(col, width=width)
rules_table = self.query_one("#garrison-rules", DataTable)
for col, width in self.RULE_COLUMNS:
rules_table.add_column(col, width=width)
app = self.app
if hasattr(app, "bus"):
app.bus.subscribe("firewall.rules", self._on_rules)
app.bus.subscribe("firewall.chains", self._on_chains)
app.bus.subscribe("state.firewall.backend", self._on_backend)
# Read initial state
if hasattr(app, "state"):
backend = app.state.get("firewall.backend", "unknown")
self._set_backend_label(backend)
rules_data = app.state.get("firewall.rules", [])
chains_data = app.state.get("firewall.chains", {})
if rules_data:
self._populate_rules(rules_data)
if chains_data:
self._populate_chains(chains_data)
def _on_backend(self, topic: str, data: Any) -> None:
backend = data.get("value", "unknown") if isinstance(data, dict) else str(data)
self.call_from_thread(self._set_backend_label, backend)
def _set_backend_label(self, backend: str) -> None:
self.query_one("#garrison-backend", Static).update(f"Backend: [bold]{backend}[/bold]")
def _on_rules(self, topic: str, data: Any) -> None:
if isinstance(data, dict):
rules = data.get("rules", [])
chains = data.get("chains", {})
backend = data.get("backend", "")
self.call_from_thread(self._populate_rules, rules)
if chains:
self.call_from_thread(self._populate_chains, chains)
if backend:
self.call_from_thread(self._set_backend_label, backend)
def _on_chains(self, topic: str, data: Any) -> None:
if isinstance(data, dict):
self.call_from_thread(self._populate_chains, data)
def _populate_chains(self, chains: Dict) -> None:
table = self.query_one("#garrison-chains", DataTable)
table.clear()
for name, info in sorted(chains.items()):
table.add_row(
name,
info.get("policy", ""),
str(info.get("rule_count", 0)),
)
def _populate_rules(self, rules: List[Dict]) -> None:
table = self.query_one("#garrison-rules", DataTable)
table.clear()
for rule in rules:
table.add_row(
rule.get("chain", ""),
rule.get("target", rule.get("expr", ""))[:20],
rule.get("prot", ""),
rule.get("in", ""),
rule.get("out", ""),
rule.get("source", rule.get("table", "")),
rule.get("destination", ""),
)
def action_refresh_firewall(self) -> None:
app = self.app
if hasattr(app, "bus"):
app.bus.publish_sync("firewall.refresh", {})