""" Tests for core.bus.EventBus Covers: - Basic subscribe / publish / unsubscribe - Wildcard topic matching (*, **) - Multiple subscribers on the same topic - Async handler dispatch - Unsubscribe removes handler correctly - History recording and retrieval - publish_sync fire-and-forget - Error in one handler does not prevent others """ from __future__ import annotations import asyncio from typing import Any, List import pytest from core.bus import EventBus # ── Fixtures ────────────────────────────────────────────────────────────── @pytest.fixture def bus() -> EventBus: return EventBus() # ── Helper ─────────────────────────────────────────────────────────────── def make_collector() -> tuple[list, Any]: """Return (received_list, async_handler).""" received: List[tuple[str, Any]] = [] async def handler(topic: str, data: Any) -> None: received.append((topic, data)) return received, handler # ── Basic subscribe / publish ───────────────────────────────────────────── @pytest.mark.asyncio async def test_basic_publish_subscribe(bus: EventBus) -> None: received, handler = make_collector() bus.subscribe("test.topic", handler) count = await bus.publish("test.topic", {"key": "value"}) assert count == 1 assert len(received) == 1 assert received[0] == ("test.topic", {"key": "value"}) @pytest.mark.asyncio async def test_no_subscribers_returns_zero(bus: EventBus) -> None: count = await bus.publish("orphan.topic", "data") assert count == 0 @pytest.mark.asyncio async def test_multiple_subscribers_same_topic(bus: EventBus) -> None: received_a, handler_a = make_collector() received_b, handler_b = make_collector() bus.subscribe("multi.topic", handler_a) bus.subscribe("multi.topic", handler_b) count = await bus.publish("multi.topic", 42) assert count == 2 assert len(received_a) == 1 assert len(received_b) == 1 @pytest.mark.asyncio async def test_publish_to_different_topic_not_received(bus: EventBus) -> None: received, handler = make_collector() bus.subscribe("topic.a", handler) await bus.publish("topic.b", "ignored") assert len(received) == 0 # ── Unsubscribe ─────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_unsubscribe_stops_delivery(bus: EventBus) -> None: received, handler = make_collector() sub_id = bus.subscribe("unsub.test", handler) await bus.publish("unsub.test", "first") assert len(received) == 1 bus.unsubscribe(sub_id) await bus.publish("unsub.test", "second") assert len(received) == 1 # no new message def test_unsubscribe_unknown_id_returns_false(bus: EventBus) -> None: assert bus.unsubscribe("not-a-real-id") is False @pytest.mark.asyncio async def test_unsubscribe_all_by_handler(bus: EventBus) -> None: received, handler = make_collector() bus.subscribe("topic.x", handler) bus.subscribe("topic.y", handler) removed = bus.unsubscribe_all(handler) assert removed == 2 await bus.publish("topic.x", None) await bus.publish("topic.y", None) assert len(received) == 0 # ── Wildcard matching ───────────────────────────────────────────────────── @pytest.mark.asyncio async def test_wildcard_single_star(bus: EventBus) -> None: received, handler = make_collector() bus.subscribe("network.*", handler) await bus.publish("network.interfaces", {"eth0": "up"}) await bus.publish("network.stats", {}) await bus.publish("system.cpu", {}) # should NOT match assert len(received) == 2 topics = [r[0] for r in received] assert "network.interfaces" in topics assert "network.stats" in topics @pytest.mark.asyncio async def test_wildcard_double_star(bus: EventBus) -> None: received, handler = make_collector() bus.subscribe("network.**", handler) await bus.publish("network.interfaces", {}) await bus.publish("network.interfaces.eth0", {}) await bus.publish("system.cpu", {}) # should NOT match assert len(received) == 2 @pytest.mark.asyncio async def test_wildcard_bare_star_matches_any_single_segment(bus: EventBus) -> None: received, handler = make_collector() bus.subscribe("*", handler) await bus.publish("anything", 1) await bus.publish("other", 2) await bus.publish("multi.segment", 3) # NOT matched by bare "*" assert len(received) == 2 @pytest.mark.asyncio async def test_exact_and_wildcard_subscriber_both_notified(bus: EventBus) -> None: exact, handler_exact = make_collector() wild, handler_wild = make_collector() bus.subscribe("net.iface", handler_exact) bus.subscribe("net.*", handler_wild) await bus.publish("net.iface", "data") assert len(exact) == 1 assert len(wild) == 1 # ── Async handler dispatch ──────────────────────────────────────────────── @pytest.mark.asyncio async def test_async_handler_is_awaited(bus: EventBus) -> None: result = [] async def slow_handler(topic: str, data: Any) -> None: await asyncio.sleep(0.01) result.append(data) bus.subscribe("async.test", slow_handler) await bus.publish("async.test", "payload") assert result == ["payload"] @pytest.mark.asyncio async def test_sync_handler_works(bus: EventBus) -> None: result = [] def sync_handler(topic: str, data: Any) -> None: result.append(data) bus.subscribe("sync.test", sync_handler) await bus.publish("sync.test", "sync-payload") assert result == ["sync-payload"] # ── Error isolation ─────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_handler_error_does_not_block_others(bus: EventBus) -> None: good_received = [] async def bad_handler(topic: str, data: Any) -> None: raise RuntimeError("intentional test error") async def good_handler(topic: str, data: Any) -> None: good_received.append(data) bus.subscribe("error.topic", bad_handler) bus.subscribe("error.topic", good_handler) # Should not raise count = await bus.publish("error.topic", "test") assert count == 2 assert good_received == ["test"] # ── History ─────────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_history_records_events(bus: EventBus) -> None: await bus.publish("hist.a", 1) await bus.publish("hist.b", 2) await bus.publish("hist.a", 3) history = bus.get_history() assert len(history) == 3 @pytest.mark.asyncio async def test_history_filter_by_topic(bus: EventBus) -> None: await bus.publish("hist.a", 1) await bus.publish("hist.b", 2) await bus.publish("hist.a", 3) filtered = bus.get_history(topic_filter="hist.a") assert len(filtered) == 2 assert all(e.topic == "hist.a" for e in filtered) @pytest.mark.asyncio async def test_history_limit(bus: EventBus) -> None: for i in range(20): await bus.publish("flood.topic", i) history = bus.get_history(limit=5) assert len(history) == 5 # Most recent 5 assert [e.data for e in history] == list(range(15, 20)) # ── Introspection ───────────────────────────────────────────────────────── def test_subscription_count(bus: EventBus) -> None: _, h1 = make_collector() _, h2 = make_collector() assert bus.subscription_count == 0 bus.subscribe("a", h1) bus.subscribe("b", h2) assert bus.subscription_count == 2 def test_patterns(bus: EventBus) -> None: _, h = make_collector() bus.subscribe("network.*", h) bus.subscribe("system.**", h) patterns = bus.patterns assert "network.*" in patterns assert "system.**" in patterns