Coverage for core / platform / events.py: 72.1%
208 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Event Bus — Topic-based pub/sub with Crossbar WAMP bridge.
4Decouples HART OS subsystems without direct imports. Any module can
5emit events, any other can subscribe — config changes, app lifecycle,
6theme updates, etc.
8Design decisions:
9- Topic-based: dot-separated names (e.g., 'config.display.scale')
10- Wildcard subscriptions: 'theme.*' matches 'theme.changed', 'theme.preset.applied'
11- Sync dispatch by default (callback in emitter's thread)
12- Optional async_emit() for non-blocking (uses core/event_loop.py)
13- Events are plain dicts — no custom event classes
14- Thread-safe via threading.Lock
15- WAMP bridge: local events optionally publish to Crossbar; WAMP events
16 fire local callbacks. Topic mapping: 'theme.changed' ↔ 'com.hartos.event.theme.changed'
18Generalizes patterns from:
19- model_bus_service.py (multi-transport routing concept)
20- crossbar_server.py (WAMP component lifecycle)
21- wamp_bridge.py (Crossbar topic conventions)
23Usage:
24 bus = EventBus()
25 bus.on('config.display.scale', handle_scale_change)
26 bus.on('theme.*', handle_any_theme_event)
27 bus.emit('config.display.scale', {'old': 1.0, 'new': 1.5})
28 bus.off('config.display.scale', handle_scale_change)
30 # Optional WAMP bridge (cross-process / cross-device)
31 bus.connect_wamp('ws://localhost:8088/ws', 'realm1')
32"""
34import asyncio
35import fnmatch
36import json
37import logging
38import os
39import threading
40from typing import Any, Callable, Dict, List, Optional
42logger = logging.getLogger('hevolve.platform')
44# WAMP topic prefix — matches crossbar_server.py / wamp_bridge.py convention
45WAMP_TOPIC_PREFIX = 'com.hartos.event'
48def _local_to_wamp(topic: str) -> str:
49 """Convert local dot-topic to WAMP URI. theme.changed → com.hartos.event.theme.changed"""
50 return f'{WAMP_TOPIC_PREFIX}.{topic}'
53def _wamp_to_local(uri: str) -> Optional[str]:
54 """Convert WAMP URI to local dot-topic. com.hartos.event.theme.changed → theme.changed"""
55 prefix = WAMP_TOPIC_PREFIX + '.'
56 if uri.startswith(prefix):
57 return uri[len(prefix):]
58 return None
61# Topic prefixes EXCLUDED from SSE fan-out. Default empty: every emit
62# reaches local + WAMP + SSE. Add a prefix here ONLY when a topic
63# proves too noisy / internal for end-clients (high-frequency tick
64# events, per-token streaming, debug probes). Most platform topics
65# (theme.*, resonance.*, federation.*, inference.*, memory.*,
66# action_state.*) are valid SSE traffic for admin dashboards / telemetry
67# views, so they stay on by default.
68#
69# 'bus.': MessageBus.publish auto-emits a `bus.<topic>` echo of every
70# publish (core/peer_link/message_bus.py:367) for HARTOS-internal
71# cross-subsystem subscribers. These are NOT meant for the SPA — the
72# canonical SSE delivery is the SEPARATE `_route_sse` leg in the same
73# publish() call (line 373) which calls broadcast_sse_safe with the
74# RAW topic. Without this denylist entry the EventBus auto-bridge at
75# line 218 fires broadcast_sse_safe AGAIN with `bus.<topic>` — two
76# SSE events for one publish, with different `type=` keys, defeating
77# the SPA's msg_id||request_id dedup keys (which diverge across the
78# two envelope shapes). Live evidence 2026-05-10 20:28:29 showed
79# TTS audio playing twice on the same chat turn caused by exactly
80# this dual-bridge race (bus.chat.pupit + chat.pupit + message all
81# fired in 10ms). Adding the denylist entry keeps `bus.*` events
82# HARTOS-internal while preserving the canonical SSE leg.
83_SSE_DENYLIST_PREFIXES: tuple = ('bus.',)
86def _topic_targets_sse(topic: str) -> bool:
87 """True unless the topic is on the SSE denylist."""
88 return not any(topic.startswith(prefix) for prefix in _SSE_DENYLIST_PREFIXES)
91class EventBus:
92 """Topic-based pub/sub event bus with optional Crossbar WAMP bridge.
94 The decoupling layer for HART OS — subsystems communicate through
95 events instead of direct imports. When a WAMP session is connected,
96 every local emit() also publishes to Crossbar, and WAMP subscriptions
97 fire local callbacks, enabling cross-process and cross-device events.
98 """
100 def __init__(self):
101 self._listeners: Dict[str, List[Callable]] = {}
102 self._wildcard_listeners: Dict[str, List[Callable]] = {}
103 self._lock = threading.Lock()
104 self._emit_count: int = 0
105 # WAMP bridge state
106 self._wamp_session = None
107 self._wamp_connected = False
108 self._wamp_loop: Optional[asyncio.AbstractEventLoop] = None
109 self._wamp_thread: Optional[threading.Thread] = None
110 self._wamp_subscribed_topics: set = set()
111 self._bridged_topics: set = set() # topics currently bridged to WAMP
113 def on(self, topic: str, callback: Callable) -> None:
114 """Subscribe to a topic.
116 Args:
117 topic: Event topic. Use '*' for wildcard matching:
118 'theme.*' matches 'theme.changed', 'theme.preset.applied'
119 '*' matches everything.
120 callback: Called with (topic, data) when event fires.
121 """
122 with self._lock:
123 if '*' in topic:
124 if topic not in self._wildcard_listeners:
125 self._wildcard_listeners[topic] = []
126 self._wildcard_listeners[topic].append(callback)
127 else:
128 if topic not in self._listeners:
129 self._listeners[topic] = []
130 self._listeners[topic].append(callback)
132 def off(self, topic: str, callback: Callable) -> None:
133 """Unsubscribe from a topic.
135 Args:
136 topic: Same topic string used in on().
137 callback: Same callback reference used in on().
138 """
139 with self._lock:
140 target = (self._wildcard_listeners if '*' in topic
141 else self._listeners)
142 if topic in target:
143 try:
144 target[topic].remove(callback)
145 if not target[topic]:
146 del target[topic]
147 except ValueError:
148 pass
150 def once(self, topic: str, callback: Callable) -> None:
151 """Subscribe to a topic for one event only.
153 After the first matching event, the callback is automatically removed.
154 """
155 def wrapper(t, data):
156 self.off(topic, wrapper)
157 callback(t, data)
158 self.on(topic, wrapper)
160 def emit(self, topic: str, data: Any = None, _from_wamp: bool = False) -> int:
161 """Emit an event synchronously.
163 Args:
164 topic: Event topic (e.g., 'config.display.scale').
165 data: Event payload (any JSON-serializable value, typically dict).
166 _from_wamp: Internal flag — True when event originated from WAMP
167 (prevents echo loop back to Crossbar).
169 Cross-transport dedup contract:
170 - Each emit() injects a unique ``msg_id`` (uuid4 hex) into the
171 data dict if the caller didn't set one. This is the
172 per-event dedup key clients use to suppress duplicates that
173 arrive via multiple transports (WAMP + SSE).
174 - ``request_id`` is the per-request GROUPING key (multiple
175 thinking events share one request_id but each has its own
176 msg_id). Clients use request_id for filtering daemon traces
177 and grouping into thinking containers, NOT for dedup.
178 - For proactive emits with no request_id (agent self-initiated
179 thinking, telemetry pushes), msg_id is the ONLY id needed.
180 Each event renders independently because msg_ids are unique.
182 Returns:
183 Number of listeners that were called.
184 """
185 self._emit_count += 1
186 called = 0
188 # Inject per-event dedup id (uuid4 hex). Skipped if caller
189 # already supplied msg_id — they may want to use a domain-
190 # specific stable id for replay-on-reconnect scenarios.
191 if isinstance(data, dict) and 'msg_id' not in data:
192 import uuid as _uuid
193 data['msg_id'] = _uuid.uuid4().hex
195 # Exact match listeners
196 with self._lock:
197 exact = list(self._listeners.get(topic, []))
198 wildcards = []
199 for pattern, cbs in self._wildcard_listeners.items():
200 if fnmatch.fnmatch(topic, pattern):
201 wildcards.extend(cbs)
203 for cb in exact:
204 try:
205 cb(topic, data)
206 called += 1
207 except Exception as e:
208 logger.warning("Event listener error on '%s': %s", topic, e)
210 for cb in wildcards:
211 try:
212 cb(topic, data)
213 called += 1
214 except Exception as e:
215 logger.warning("Wildcard listener error on '%s': %s", topic, e)
217 # Bridge to WAMP (skip if event already came from WAMP → no echo)
218 if not _from_wamp and self._wamp_connected and self._wamp_session:
219 self._publish_to_wamp(topic, data)
221 # Bridge to SSE (Nunba desktop / Android web view). This grew the
222 # SSE transport adapter the broadcast_sse_safe docstring asked for
223 # (line 393 of this file). Topic policy is a DENYLIST (see
224 # _SSE_DENYLIST_PREFIXES at module top, default empty) — every
225 # topic fans out to SSE by default; add a prefix to the denylist
226 # only when a topic proves too noisy / internal for end-clients.
227 # Per-event dedup happens client-side via msg_id (auto-injected
228 # below for dict payloads), so the same event arriving via WAMP
229 # and SSE renders once. Echo guard: skip when the event came
230 # from WAMP so a WAMP→local→SSE round trip doesn't double-deliver
231 # a message that the WAMP bridge already shipped on its own
232 # connection.
233 if not _from_wamp and _topic_targets_sse(topic):
234 _user_id = data.get('user_id') if isinstance(data, dict) else None
235 broadcast_sse_safe(topic, data, user_id=_user_id)
237 return called
239 def emit_async(self, topic: str, data: Any = None) -> None:
240 """Emit an event asynchronously (fire-and-forget in a thread).
242 Uses a daemon thread so it won't block shutdown.
243 """
244 t = threading.Thread(target=self.emit, args=(topic, data), daemon=True)
245 t.start()
247 def has_listeners(self, topic: str) -> bool:
248 """Check if a topic has any subscribers (exact or wildcard)."""
249 with self._lock:
250 if topic in self._listeners and self._listeners[topic]:
251 return True
252 for pattern in self._wildcard_listeners:
253 if fnmatch.fnmatch(topic, pattern):
254 return True
255 return False
257 def topics(self) -> List[str]:
258 """Return all topics with registered listeners."""
259 with self._lock:
260 exact = list(self._listeners.keys())
261 wild = list(self._wildcard_listeners.keys())
262 return exact + wild
264 def clear(self) -> None:
265 """Remove all listeners. For testing."""
266 with self._lock:
267 self._listeners.clear()
268 self._wildcard_listeners.clear()
270 # ─── WAMP / Crossbar Bridge ─────────────────────────────
272 def connect_wamp(self, url: str = None, realm: str = None) -> bool:
273 """Connect EventBus to Crossbar WAMP router.
275 Local events are published to WAMP; WAMP events fire local callbacks.
276 Uses autobahn (same as crossbar_server.py / wamp_bridge.py).
278 Args:
279 url: WebSocket URL (default: CBURL env or ws://localhost:8088/ws)
280 realm: WAMP realm (default: CBREALM env or realm1)
282 Returns:
283 True if connection initiated (async — may not be connected yet).
284 """
285 try:
286 from autobahn.asyncio.component import Component
287 except ImportError:
288 logger.warning("autobahn not installed — WAMP bridge unavailable")
289 return False
291 url = url or os.environ.get('CBURL', 'ws://localhost:8088/ws')
292 realm = realm or os.environ.get('CBREALM', 'realm1')
294 component = Component(transports=url, realm=realm)
295 bus = self # closure capture
297 @component.on_join
298 async def on_join(session, details):
299 bus._wamp_session = session
300 bus._wamp_connected = True
301 logger.info("EventBus WAMP bridge connected to %s (realm=%s)", url, realm)
303 # Subscribe to the wildcard topic for all HARTOS events
304 wamp_wildcard = f'{WAMP_TOPIC_PREFIX}.'
305 try:
306 await session.subscribe(bus._on_wamp_event, wamp_wildcard,
307 options={'match': 'prefix'})
308 logger.info("EventBus subscribed to WAMP prefix: %s", wamp_wildcard)
309 except Exception as e:
310 logger.warning("WAMP wildcard subscribe failed: %s", e)
312 @component.on_leave
313 async def on_leave(session, details):
314 bus._wamp_connected = False
315 bus._wamp_session = None
316 logger.info("EventBus WAMP bridge disconnected")
318 # Run WAMP component in a background thread with its own event loop
319 # Reconnects with exponential backoff on disconnect/failure
320 bus._wamp_stop = False
322 def _run():
323 import time as _time
324 backoff = 1
325 max_backoff = 60
326 while not bus._wamp_stop:
327 loop = asyncio.new_event_loop()
328 bus._wamp_loop = loop
329 asyncio.set_event_loop(loop)
330 connected_at = None
331 try:
332 connected_at = _time.time()
333 loop.run_until_complete(component.start(loop=loop))
334 except Exception as e:
335 logger.warning("WAMP component exited: %s — reconnecting in %ds", e, backoff)
336 finally:
337 bus._wamp_loop = None
338 bus._wamp_connected = False
339 bus._wamp_session = None
340 # Reset backoff if connection lived > 60s (was a real session)
341 if connected_at and (_time.time() - connected_at) > 60:
342 backoff = 1
343 _time.sleep(backoff)
344 backoff = min(backoff * 2, max_backoff)
346 self._wamp_thread = threading.Thread(target=_run, daemon=True, name='eventbus-wamp')
347 self._wamp_thread.start()
348 return True
350 def disconnect_wamp(self):
351 """Disconnect from Crossbar WAMP router."""
352 self._wamp_stop = True
353 self._wamp_connected = False
354 session = self._wamp_session
355 self._wamp_session = None
356 if session and self._wamp_loop:
357 try:
358 asyncio.run_coroutine_threadsafe(session.leave(), self._wamp_loop)
359 except Exception:
360 pass
361 logger.info("EventBus WAMP bridge disconnected")
363 def _publish_to_wamp(self, topic: str, data: Any):
364 """Publish a local event to WAMP (fire-and-forget)."""
365 session = self._wamp_session
366 loop = self._wamp_loop
367 if not session or not loop:
368 return
369 wamp_uri = _local_to_wamp(topic)
370 # Serialize data to JSON-safe dict for WAMP transport
371 try:
372 payload = json.loads(json.dumps(data, default=str)) if data is not None else {}
373 except (TypeError, ValueError):
374 payload = {'value': str(data)}
375 try:
376 asyncio.run_coroutine_threadsafe(
377 session.publish(wamp_uri, payload), loop
378 )
379 except Exception as e:
380 logger.debug("WAMP publish failed for %s: %s", wamp_uri, e)
382 async def _on_wamp_event(self, *args, **kwargs):
383 """Handle incoming WAMP event → dispatch to local listeners."""
384 # autobahn passes positional args; first is payload, details in kwargs
385 payload = args[0] if args else kwargs
386 details = kwargs.get('details')
387 # Extract the WAMP topic from details
388 wamp_topic = getattr(details, 'topic', None) if details else None
389 if not wamp_topic:
390 return
391 local_topic = _wamp_to_local(wamp_topic)
392 if local_topic:
393 # Dispatch locally, but mark _from_wamp to prevent echo
394 self.emit(local_topic, payload, _from_wamp=True)
396 @property
397 def wamp_connected(self) -> bool:
398 """Whether the WAMP bridge is currently connected."""
399 return self._wamp_connected
401 # ─── Properties & Health ──────────────────────────────────
403 @property
404 def emit_count(self) -> int:
405 """Total number of emit() calls since creation."""
406 return self._emit_count
408 def health(self) -> dict:
409 """Health report for ServiceRegistry integration."""
410 with self._lock:
411 exact_count = sum(len(v) for v in self._listeners.values())
412 wild_count = sum(len(v) for v in self._wildcard_listeners.values())
413 return {
414 'status': 'ok',
415 'listeners': exact_count + wild_count,
416 'topics': len(self._listeners) + len(self._wildcard_listeners),
417 'total_emits': self._emit_count,
418 'wamp_connected': self._wamp_connected,
419 }
422# ─── Module-level helper — safe emit without circular imports ─────
424def emit_event(topic: str, data: Any = None, async_: bool = True) -> None:
425 """Emit an event on the platform EventBus (if bootstrapped).
427 Safe to call from anywhere — no-ops if the platform hasn't been bootstrapped.
428 Uses emit_async by default to avoid blocking the caller.
430 Args:
431 topic: Dot-separated topic (e.g., 'theme.changed', 'resonance.tuned')
432 data: JSON-serializable payload
433 async_: If True (default), emit in a background thread
434 """
435 try:
436 from core.platform.registry import get_registry
437 registry = get_registry()
438 if not registry.has('events'):
439 return
440 bus = registry.get('events')
441 if async_:
442 bus.emit_async(topic, data)
443 else:
444 bus.emit(topic, data)
445 except Exception:
446 pass # Never block callers — event emission is best-effort
449def broadcast_sse_safe(event_type: str, data: Any, user_id: Any = None) -> bool:
450 """Best-effort SSE broadcast to Nunba desktop clients.
452 Nunba's ``main.py`` exposes ``broadcast_sse_event`` on ``__main__`` —
453 the SSE fan-out is still a distinct transport from the WAMP /
454 MessageBus path, so callers that want to reach SSE listeners must
455 push there explicitly. This helper encapsulates the
456 ``import __main__`` + ``sys.modules.get('main_module')`` fallback
457 chain so ``hart_intelligence_entry``, ``integrations.social.realtime``,
458 ``model_orchestrator``, etc. don't each reimplement the same 10-line
459 try/except block. Until EventBus grows a proper SSE transport
460 adapter, this is the canonical single-call entrypoint for pushing a
461 payload to SSE subscribers.
463 Args:
464 event_type: SSE ``event:`` type string (e.g. ``'notification'``,
465 ``'message'``, ``'capability_update'``).
466 data: JSON-serializable payload dict.
467 user_id: Target user for per-user routing. ``None`` broadcasts
468 to every connected SSE client.
470 Returns:
471 ``True`` if the broadcast function was resolved and invoked,
472 ``False`` otherwise. Never raises — SSE delivery is best-effort
473 and must not block the caller or mask the primary transport.
474 """
475 try:
476 import sys as _sys
477 import __main__ as _main_mod
478 broadcast = getattr(_main_mod, 'broadcast_sse_event', None)
479 if broadcast is None:
480 _mm = _sys.modules.get('main_module')
481 if _mm is not None:
482 broadcast = getattr(_mm, 'broadcast_sse_event', None)
483 if broadcast is None:
484 return False
485 broadcast(event_type, data, user_id=user_id)
486 return True
487 except Exception as e:
488 logger.debug("SSE broadcast skipped: %s", e)
489 return False