Files

274 lines
8.6 KiB
Python

"""
Tests for core.bus.EventBus
Covers:
- Basic subscribe / publish / unsubscribe
- Wildcard topic matching (*, **)
- Multiple subscribers on the same topic
- Async handler dispatch
- Unsubscribe removes handler correctly
- History recording and retrieval
- publish_sync fire-and-forget
- Error in one handler does not prevent others
"""
from __future__ import annotations
import asyncio
from typing import Any, List
import pytest
from core.bus import EventBus
# ── Fixtures ──────────────────────────────────────────────────────────────
@pytest.fixture
def bus() -> EventBus:
return EventBus()
# ── Helper ───────────────────────────────────────────────────────────────
def make_collector() -> tuple[list, Any]:
"""Return (received_list, async_handler)."""
received: List[tuple[str, Any]] = []
async def handler(topic: str, data: Any) -> None:
received.append((topic, data))
return received, handler
# ── Basic subscribe / publish ─────────────────────────────────────────────
@pytest.mark.asyncio
async def test_basic_publish_subscribe(bus: EventBus) -> None:
received, handler = make_collector()
bus.subscribe("test.topic", handler)
count = await bus.publish("test.topic", {"key": "value"})
assert count == 1
assert len(received) == 1
assert received[0] == ("test.topic", {"key": "value"})
@pytest.mark.asyncio
async def test_no_subscribers_returns_zero(bus: EventBus) -> None:
count = await bus.publish("orphan.topic", "data")
assert count == 0
@pytest.mark.asyncio
async def test_multiple_subscribers_same_topic(bus: EventBus) -> None:
received_a, handler_a = make_collector()
received_b, handler_b = make_collector()
bus.subscribe("multi.topic", handler_a)
bus.subscribe("multi.topic", handler_b)
count = await bus.publish("multi.topic", 42)
assert count == 2
assert len(received_a) == 1
assert len(received_b) == 1
@pytest.mark.asyncio
async def test_publish_to_different_topic_not_received(bus: EventBus) -> None:
received, handler = make_collector()
bus.subscribe("topic.a", handler)
await bus.publish("topic.b", "ignored")
assert len(received) == 0
# ── Unsubscribe ───────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_unsubscribe_stops_delivery(bus: EventBus) -> None:
received, handler = make_collector()
sub_id = bus.subscribe("unsub.test", handler)
await bus.publish("unsub.test", "first")
assert len(received) == 1
bus.unsubscribe(sub_id)
await bus.publish("unsub.test", "second")
assert len(received) == 1 # no new message
def test_unsubscribe_unknown_id_returns_false(bus: EventBus) -> None:
assert bus.unsubscribe("not-a-real-id") is False
@pytest.mark.asyncio
async def test_unsubscribe_all_by_handler(bus: EventBus) -> None:
received, handler = make_collector()
bus.subscribe("topic.x", handler)
bus.subscribe("topic.y", handler)
removed = bus.unsubscribe_all(handler)
assert removed == 2
await bus.publish("topic.x", None)
await bus.publish("topic.y", None)
assert len(received) == 0
# ── Wildcard matching ─────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_wildcard_single_star(bus: EventBus) -> None:
received, handler = make_collector()
bus.subscribe("network.*", handler)
await bus.publish("network.interfaces", {"eth0": "up"})
await bus.publish("network.stats", {})
await bus.publish("system.cpu", {}) # should NOT match
assert len(received) == 2
topics = [r[0] for r in received]
assert "network.interfaces" in topics
assert "network.stats" in topics
@pytest.mark.asyncio
async def test_wildcard_double_star(bus: EventBus) -> None:
received, handler = make_collector()
bus.subscribe("network.**", handler)
await bus.publish("network.interfaces", {})
await bus.publish("network.interfaces.eth0", {})
await bus.publish("system.cpu", {}) # should NOT match
assert len(received) == 2
@pytest.mark.asyncio
async def test_wildcard_bare_star_matches_any_single_segment(bus: EventBus) -> None:
received, handler = make_collector()
bus.subscribe("*", handler)
await bus.publish("anything", 1)
await bus.publish("other", 2)
await bus.publish("multi.segment", 3) # NOT matched by bare "*"
assert len(received) == 2
@pytest.mark.asyncio
async def test_exact_and_wildcard_subscriber_both_notified(bus: EventBus) -> None:
exact, handler_exact = make_collector()
wild, handler_wild = make_collector()
bus.subscribe("net.iface", handler_exact)
bus.subscribe("net.*", handler_wild)
await bus.publish("net.iface", "data")
assert len(exact) == 1
assert len(wild) == 1
# ── Async handler dispatch ────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_async_handler_is_awaited(bus: EventBus) -> None:
result = []
async def slow_handler(topic: str, data: Any) -> None:
await asyncio.sleep(0.01)
result.append(data)
bus.subscribe("async.test", slow_handler)
await bus.publish("async.test", "payload")
assert result == ["payload"]
@pytest.mark.asyncio
async def test_sync_handler_works(bus: EventBus) -> None:
result = []
def sync_handler(topic: str, data: Any) -> None:
result.append(data)
bus.subscribe("sync.test", sync_handler)
await bus.publish("sync.test", "sync-payload")
assert result == ["sync-payload"]
# ── Error isolation ───────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_handler_error_does_not_block_others(bus: EventBus) -> None:
good_received = []
async def bad_handler(topic: str, data: Any) -> None:
raise RuntimeError("intentional test error")
async def good_handler(topic: str, data: Any) -> None:
good_received.append(data)
bus.subscribe("error.topic", bad_handler)
bus.subscribe("error.topic", good_handler)
# Should not raise
count = await bus.publish("error.topic", "test")
assert count == 2
assert good_received == ["test"]
# ── History ───────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_history_records_events(bus: EventBus) -> None:
await bus.publish("hist.a", 1)
await bus.publish("hist.b", 2)
await bus.publish("hist.a", 3)
history = bus.get_history()
assert len(history) == 3
@pytest.mark.asyncio
async def test_history_filter_by_topic(bus: EventBus) -> None:
await bus.publish("hist.a", 1)
await bus.publish("hist.b", 2)
await bus.publish("hist.a", 3)
filtered = bus.get_history(topic_filter="hist.a")
assert len(filtered) == 2
assert all(e.topic == "hist.a" for e in filtered)
@pytest.mark.asyncio
async def test_history_limit(bus: EventBus) -> None:
for i in range(20):
await bus.publish("flood.topic", i)
history = bus.get_history(limit=5)
assert len(history) == 5
# Most recent 5
assert [e.data for e in history] == list(range(15, 20))
# ── Introspection ─────────────────────────────────────────────────────────
def test_subscription_count(bus: EventBus) -> None:
_, h1 = make_collector()
_, h2 = make_collector()
assert bus.subscription_count == 0
bus.subscribe("a", h1)
bus.subscribe("b", h2)
assert bus.subscription_count == 2
def test_patterns(bus: EventBus) -> None:
_, h = make_collector()
bus.subscribe("network.*", h)
bus.subscribe("system.**", h)
patterns = bus.patterns
assert "network.*" in patterns
assert "system.**" in patterns