Coverage for core / platform / events.py: 72.1%

208 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Event Bus — Topic-based pub/sub with Crossbar WAMP bridge. 

3 

4Decouples HART OS subsystems without direct imports. Any module can 

5emit events, any other can subscribe — config changes, app lifecycle, 

6theme updates, etc. 

7 

8Design decisions: 

9- Topic-based: dot-separated names (e.g., 'config.display.scale') 

10- Wildcard subscriptions: 'theme.*' matches 'theme.changed', 'theme.preset.applied' 

11- Sync dispatch by default (callback in emitter's thread) 

12- Optional async_emit() for non-blocking (uses core/event_loop.py) 

13- Events are plain dicts — no custom event classes 

14- Thread-safe via threading.Lock 

15- WAMP bridge: local events optionally publish to Crossbar; WAMP events 

16 fire local callbacks. Topic mapping: 'theme.changed' ↔ 'com.hartos.event.theme.changed' 

17 

18Generalizes patterns from: 

19- model_bus_service.py (multi-transport routing concept) 

20- crossbar_server.py (WAMP component lifecycle) 

21- wamp_bridge.py (Crossbar topic conventions) 

22 

23Usage: 

24 bus = EventBus() 

25 bus.on('config.display.scale', handle_scale_change) 

26 bus.on('theme.*', handle_any_theme_event) 

27 bus.emit('config.display.scale', {'old': 1.0, 'new': 1.5}) 

28 bus.off('config.display.scale', handle_scale_change) 

29 

30 # Optional WAMP bridge (cross-process / cross-device) 

31 bus.connect_wamp('ws://localhost:8088/ws', 'realm1') 

32""" 

33 

34import asyncio 

35import fnmatch 

36import json 

37import logging 

38import os 

39import threading 

40from typing import Any, Callable, Dict, List, Optional 

41 

42logger = logging.getLogger('hevolve.platform') 

43 

44# WAMP topic prefix — matches crossbar_server.py / wamp_bridge.py convention 

45WAMP_TOPIC_PREFIX = 'com.hartos.event' 

46 

47 

48def _local_to_wamp(topic: str) -> str: 

49 """Convert local dot-topic to WAMP URI. theme.changed → com.hartos.event.theme.changed""" 

50 return f'{WAMP_TOPIC_PREFIX}.{topic}' 

51 

52 

53def _wamp_to_local(uri: str) -> Optional[str]: 

54 """Convert WAMP URI to local dot-topic. com.hartos.event.theme.changed → theme.changed""" 

55 prefix = WAMP_TOPIC_PREFIX + '.' 

56 if uri.startswith(prefix): 

57 return uri[len(prefix):] 

58 return None 

59 

60 

61# Topic prefixes EXCLUDED from SSE fan-out. Default empty: every emit 

62# reaches local + WAMP + SSE. Add a prefix here ONLY when a topic 

63# proves too noisy / internal for end-clients (high-frequency tick 

64# events, per-token streaming, debug probes). Most platform topics 

65# (theme.*, resonance.*, federation.*, inference.*, memory.*, 

66# action_state.*) are valid SSE traffic for admin dashboards / telemetry 

67# views, so they stay on by default. 

68# 

69# 'bus.': MessageBus.publish auto-emits a `bus.<topic>` echo of every 

70# publish (core/peer_link/message_bus.py:367) for HARTOS-internal 

71# cross-subsystem subscribers. These are NOT meant for the SPA — the 

72# canonical SSE delivery is the SEPARATE `_route_sse` leg in the same 

73# publish() call (line 373) which calls broadcast_sse_safe with the 

74# RAW topic. Without this denylist entry the EventBus auto-bridge at 

75# line 218 fires broadcast_sse_safe AGAIN with `bus.<topic>` — two 

76# SSE events for one publish, with different `type=` keys, defeating 

77# the SPA's msg_id||request_id dedup keys (which diverge across the 

78# two envelope shapes). Live evidence 2026-05-10 20:28:29 showed 

79# TTS audio playing twice on the same chat turn caused by exactly 

80# this dual-bridge race (bus.chat.pupit + chat.pupit + message all 

81# fired in 10ms). Adding the denylist entry keeps `bus.*` events 

82# HARTOS-internal while preserving the canonical SSE leg. 

83_SSE_DENYLIST_PREFIXES: tuple = ('bus.',) 

84 

85 

86def _topic_targets_sse(topic: str) -> bool: 

87 """True unless the topic is on the SSE denylist.""" 

88 return not any(topic.startswith(prefix) for prefix in _SSE_DENYLIST_PREFIXES) 

89 

90 

91class EventBus: 

92 """Topic-based pub/sub event bus with optional Crossbar WAMP bridge. 

93 

94 The decoupling layer for HART OS — subsystems communicate through 

95 events instead of direct imports. When a WAMP session is connected, 

96 every local emit() also publishes to Crossbar, and WAMP subscriptions 

97 fire local callbacks, enabling cross-process and cross-device events. 

98 """ 

99 

100 def __init__(self): 

101 self._listeners: Dict[str, List[Callable]] = {} 

102 self._wildcard_listeners: Dict[str, List[Callable]] = {} 

103 self._lock = threading.Lock() 

104 self._emit_count: int = 0 

105 # WAMP bridge state 

106 self._wamp_session = None 

107 self._wamp_connected = False 

108 self._wamp_loop: Optional[asyncio.AbstractEventLoop] = None 

109 self._wamp_thread: Optional[threading.Thread] = None 

110 self._wamp_subscribed_topics: set = set() 

111 self._bridged_topics: set = set() # topics currently bridged to WAMP 

112 

113 def on(self, topic: str, callback: Callable) -> None: 

114 """Subscribe to a topic. 

115 

116 Args: 

117 topic: Event topic. Use '*' for wildcard matching: 

118 'theme.*' matches 'theme.changed', 'theme.preset.applied' 

119 '*' matches everything. 

120 callback: Called with (topic, data) when event fires. 

121 """ 

122 with self._lock: 

123 if '*' in topic: 

124 if topic not in self._wildcard_listeners: 

125 self._wildcard_listeners[topic] = [] 

126 self._wildcard_listeners[topic].append(callback) 

127 else: 

128 if topic not in self._listeners: 

129 self._listeners[topic] = [] 

130 self._listeners[topic].append(callback) 

131 

132 def off(self, topic: str, callback: Callable) -> None: 

133 """Unsubscribe from a topic. 

134 

135 Args: 

136 topic: Same topic string used in on(). 

137 callback: Same callback reference used in on(). 

138 """ 

139 with self._lock: 

140 target = (self._wildcard_listeners if '*' in topic 

141 else self._listeners) 

142 if topic in target: 

143 try: 

144 target[topic].remove(callback) 

145 if not target[topic]: 

146 del target[topic] 

147 except ValueError: 

148 pass 

149 

150 def once(self, topic: str, callback: Callable) -> None: 

151 """Subscribe to a topic for one event only. 

152 

153 After the first matching event, the callback is automatically removed. 

154 """ 

155 def wrapper(t, data): 

156 self.off(topic, wrapper) 

157 callback(t, data) 

158 self.on(topic, wrapper) 

159 

160 def emit(self, topic: str, data: Any = None, _from_wamp: bool = False) -> int: 

161 """Emit an event synchronously. 

162 

163 Args: 

164 topic: Event topic (e.g., 'config.display.scale'). 

165 data: Event payload (any JSON-serializable value, typically dict). 

166 _from_wamp: Internal flag — True when event originated from WAMP 

167 (prevents echo loop back to Crossbar). 

168 

169 Cross-transport dedup contract: 

170 - Each emit() injects a unique ``msg_id`` (uuid4 hex) into the 

171 data dict if the caller didn't set one. This is the 

172 per-event dedup key clients use to suppress duplicates that 

173 arrive via multiple transports (WAMP + SSE). 

174 - ``request_id`` is the per-request GROUPING key (multiple 

175 thinking events share one request_id but each has its own 

176 msg_id). Clients use request_id for filtering daemon traces 

177 and grouping into thinking containers, NOT for dedup. 

178 - For proactive emits with no request_id (agent self-initiated 

179 thinking, telemetry pushes), msg_id is the ONLY id needed. 

180 Each event renders independently because msg_ids are unique. 

181 

182 Returns: 

183 Number of listeners that were called. 

184 """ 

185 self._emit_count += 1 

186 called = 0 

187 

188 # Inject per-event dedup id (uuid4 hex). Skipped if caller 

189 # already supplied msg_id — they may want to use a domain- 

190 # specific stable id for replay-on-reconnect scenarios. 

191 if isinstance(data, dict) and 'msg_id' not in data: 

192 import uuid as _uuid 

193 data['msg_id'] = _uuid.uuid4().hex 

194 

195 # Exact match listeners 

196 with self._lock: 

197 exact = list(self._listeners.get(topic, [])) 

198 wildcards = [] 

199 for pattern, cbs in self._wildcard_listeners.items(): 

200 if fnmatch.fnmatch(topic, pattern): 

201 wildcards.extend(cbs) 

202 

203 for cb in exact: 

204 try: 

205 cb(topic, data) 

206 called += 1 

207 except Exception as e: 

208 logger.warning("Event listener error on '%s': %s", topic, e) 

209 

210 for cb in wildcards: 

211 try: 

212 cb(topic, data) 

213 called += 1 

214 except Exception as e: 

215 logger.warning("Wildcard listener error on '%s': %s", topic, e) 

216 

217 # Bridge to WAMP (skip if event already came from WAMP → no echo) 

218 if not _from_wamp and self._wamp_connected and self._wamp_session: 

219 self._publish_to_wamp(topic, data) 

220 

221 # Bridge to SSE (Nunba desktop / Android web view). This grew the 

222 # SSE transport adapter the broadcast_sse_safe docstring asked for 

223 # (line 393 of this file). Topic policy is a DENYLIST (see 

224 # _SSE_DENYLIST_PREFIXES at module top, default empty) — every 

225 # topic fans out to SSE by default; add a prefix to the denylist 

226 # only when a topic proves too noisy / internal for end-clients. 

227 # Per-event dedup happens client-side via msg_id (auto-injected 

228 # below for dict payloads), so the same event arriving via WAMP 

229 # and SSE renders once. Echo guard: skip when the event came 

230 # from WAMP so a WAMP→local→SSE round trip doesn't double-deliver 

231 # a message that the WAMP bridge already shipped on its own 

232 # connection. 

233 if not _from_wamp and _topic_targets_sse(topic): 

234 _user_id = data.get('user_id') if isinstance(data, dict) else None 

235 broadcast_sse_safe(topic, data, user_id=_user_id) 

236 

237 return called 

238 

239 def emit_async(self, topic: str, data: Any = None) -> None: 

240 """Emit an event asynchronously (fire-and-forget in a thread). 

241 

242 Uses a daemon thread so it won't block shutdown. 

243 """ 

244 t = threading.Thread(target=self.emit, args=(topic, data), daemon=True) 

245 t.start() 

246 

247 def has_listeners(self, topic: str) -> bool: 

248 """Check if a topic has any subscribers (exact or wildcard).""" 

249 with self._lock: 

250 if topic in self._listeners and self._listeners[topic]: 

251 return True 

252 for pattern in self._wildcard_listeners: 

253 if fnmatch.fnmatch(topic, pattern): 

254 return True 

255 return False 

256 

257 def topics(self) -> List[str]: 

258 """Return all topics with registered listeners.""" 

259 with self._lock: 

260 exact = list(self._listeners.keys()) 

261 wild = list(self._wildcard_listeners.keys()) 

262 return exact + wild 

263 

264 def clear(self) -> None: 

265 """Remove all listeners. For testing.""" 

266 with self._lock: 

267 self._listeners.clear() 

268 self._wildcard_listeners.clear() 

269 

270 # ─── WAMP / Crossbar Bridge ───────────────────────────── 

271 

272 def connect_wamp(self, url: str = None, realm: str = None) -> bool: 

273 """Connect EventBus to Crossbar WAMP router. 

274 

275 Local events are published to WAMP; WAMP events fire local callbacks. 

276 Uses autobahn (same as crossbar_server.py / wamp_bridge.py). 

277 

278 Args: 

279 url: WebSocket URL (default: CBURL env or ws://localhost:8088/ws) 

280 realm: WAMP realm (default: CBREALM env or realm1) 

281 

282 Returns: 

283 True if connection initiated (async — may not be connected yet). 

284 """ 

285 try: 

286 from autobahn.asyncio.component import Component 

287 except ImportError: 

288 logger.warning("autobahn not installed — WAMP bridge unavailable") 

289 return False 

290 

291 url = url or os.environ.get('CBURL', 'ws://localhost:8088/ws') 

292 realm = realm or os.environ.get('CBREALM', 'realm1') 

293 

294 component = Component(transports=url, realm=realm) 

295 bus = self # closure capture 

296 

297 @component.on_join 

298 async def on_join(session, details): 

299 bus._wamp_session = session 

300 bus._wamp_connected = True 

301 logger.info("EventBus WAMP bridge connected to %s (realm=%s)", url, realm) 

302 

303 # Subscribe to the wildcard topic for all HARTOS events 

304 wamp_wildcard = f'{WAMP_TOPIC_PREFIX}.' 

305 try: 

306 await session.subscribe(bus._on_wamp_event, wamp_wildcard, 

307 options={'match': 'prefix'}) 

308 logger.info("EventBus subscribed to WAMP prefix: %s", wamp_wildcard) 

309 except Exception as e: 

310 logger.warning("WAMP wildcard subscribe failed: %s", e) 

311 

312 @component.on_leave 

313 async def on_leave(session, details): 

314 bus._wamp_connected = False 

315 bus._wamp_session = None 

316 logger.info("EventBus WAMP bridge disconnected") 

317 

318 # Run WAMP component in a background thread with its own event loop 

319 # Reconnects with exponential backoff on disconnect/failure 

320 bus._wamp_stop = False 

321 

322 def _run(): 

323 import time as _time 

324 backoff = 1 

325 max_backoff = 60 

326 while not bus._wamp_stop: 

327 loop = asyncio.new_event_loop() 

328 bus._wamp_loop = loop 

329 asyncio.set_event_loop(loop) 

330 connected_at = None 

331 try: 

332 connected_at = _time.time() 

333 loop.run_until_complete(component.start(loop=loop)) 

334 except Exception as e: 

335 logger.warning("WAMP component exited: %s — reconnecting in %ds", e, backoff) 

336 finally: 

337 bus._wamp_loop = None 

338 bus._wamp_connected = False 

339 bus._wamp_session = None 

340 # Reset backoff if connection lived > 60s (was a real session) 

341 if connected_at and (_time.time() - connected_at) > 60: 

342 backoff = 1 

343 _time.sleep(backoff) 

344 backoff = min(backoff * 2, max_backoff) 

345 

346 self._wamp_thread = threading.Thread(target=_run, daemon=True, name='eventbus-wamp') 

347 self._wamp_thread.start() 

348 return True 

349 

350 def disconnect_wamp(self): 

351 """Disconnect from Crossbar WAMP router.""" 

352 self._wamp_stop = True 

353 self._wamp_connected = False 

354 session = self._wamp_session 

355 self._wamp_session = None 

356 if session and self._wamp_loop: 

357 try: 

358 asyncio.run_coroutine_threadsafe(session.leave(), self._wamp_loop) 

359 except Exception: 

360 pass 

361 logger.info("EventBus WAMP bridge disconnected") 

362 

363 def _publish_to_wamp(self, topic: str, data: Any): 

364 """Publish a local event to WAMP (fire-and-forget).""" 

365 session = self._wamp_session 

366 loop = self._wamp_loop 

367 if not session or not loop: 

368 return 

369 wamp_uri = _local_to_wamp(topic) 

370 # Serialize data to JSON-safe dict for WAMP transport 

371 try: 

372 payload = json.loads(json.dumps(data, default=str)) if data is not None else {} 

373 except (TypeError, ValueError): 

374 payload = {'value': str(data)} 

375 try: 

376 asyncio.run_coroutine_threadsafe( 

377 session.publish(wamp_uri, payload), loop 

378 ) 

379 except Exception as e: 

380 logger.debug("WAMP publish failed for %s: %s", wamp_uri, e) 

381 

382 async def _on_wamp_event(self, *args, **kwargs): 

383 """Handle incoming WAMP event → dispatch to local listeners.""" 

384 # autobahn passes positional args; first is payload, details in kwargs 

385 payload = args[0] if args else kwargs 

386 details = kwargs.get('details') 

387 # Extract the WAMP topic from details 

388 wamp_topic = getattr(details, 'topic', None) if details else None 

389 if not wamp_topic: 

390 return 

391 local_topic = _wamp_to_local(wamp_topic) 

392 if local_topic: 

393 # Dispatch locally, but mark _from_wamp to prevent echo 

394 self.emit(local_topic, payload, _from_wamp=True) 

395 

396 @property 

397 def wamp_connected(self) -> bool: 

398 """Whether the WAMP bridge is currently connected.""" 

399 return self._wamp_connected 

400 

401 # ─── Properties & Health ────────────────────────────────── 

402 

403 @property 

404 def emit_count(self) -> int: 

405 """Total number of emit() calls since creation.""" 

406 return self._emit_count 

407 

408 def health(self) -> dict: 

409 """Health report for ServiceRegistry integration.""" 

410 with self._lock: 

411 exact_count = sum(len(v) for v in self._listeners.values()) 

412 wild_count = sum(len(v) for v in self._wildcard_listeners.values()) 

413 return { 

414 'status': 'ok', 

415 'listeners': exact_count + wild_count, 

416 'topics': len(self._listeners) + len(self._wildcard_listeners), 

417 'total_emits': self._emit_count, 

418 'wamp_connected': self._wamp_connected, 

419 } 

420 

421 

422# ─── Module-level helper — safe emit without circular imports ───── 

423 

424def emit_event(topic: str, data: Any = None, async_: bool = True) -> None: 

425 """Emit an event on the platform EventBus (if bootstrapped). 

426 

427 Safe to call from anywhere — no-ops if the platform hasn't been bootstrapped. 

428 Uses emit_async by default to avoid blocking the caller. 

429 

430 Args: 

431 topic: Dot-separated topic (e.g., 'theme.changed', 'resonance.tuned') 

432 data: JSON-serializable payload 

433 async_: If True (default), emit in a background thread 

434 """ 

435 try: 

436 from core.platform.registry import get_registry 

437 registry = get_registry() 

438 if not registry.has('events'): 

439 return 

440 bus = registry.get('events') 

441 if async_: 

442 bus.emit_async(topic, data) 

443 else: 

444 bus.emit(topic, data) 

445 except Exception: 

446 pass # Never block callers — event emission is best-effort 

447 

448 

449def broadcast_sse_safe(event_type: str, data: Any, user_id: Any = None) -> bool: 

450 """Best-effort SSE broadcast to Nunba desktop clients. 

451 

452 Nunba's ``main.py`` exposes ``broadcast_sse_event`` on ``__main__`` — 

453 the SSE fan-out is still a distinct transport from the WAMP / 

454 MessageBus path, so callers that want to reach SSE listeners must 

455 push there explicitly. This helper encapsulates the 

456 ``import __main__`` + ``sys.modules.get('main_module')`` fallback 

457 chain so ``hart_intelligence_entry``, ``integrations.social.realtime``, 

458 ``model_orchestrator``, etc. don't each reimplement the same 10-line 

459 try/except block. Until EventBus grows a proper SSE transport 

460 adapter, this is the canonical single-call entrypoint for pushing a 

461 payload to SSE subscribers. 

462 

463 Args: 

464 event_type: SSE ``event:`` type string (e.g. ``'notification'``, 

465 ``'message'``, ``'capability_update'``). 

466 data: JSON-serializable payload dict. 

467 user_id: Target user for per-user routing. ``None`` broadcasts 

468 to every connected SSE client. 

469 

470 Returns: 

471 ``True`` if the broadcast function was resolved and invoked, 

472 ``False`` otherwise. Never raises — SSE delivery is best-effort 

473 and must not block the caller or mask the primary transport. 

474 """ 

475 try: 

476 import sys as _sys 

477 import __main__ as _main_mod 

478 broadcast = getattr(_main_mod, 'broadcast_sse_event', None) 

479 if broadcast is None: 

480 _mm = _sys.modules.get('main_module') 

481 if _mm is not None: 

482 broadcast = getattr(_mm, 'broadcast_sse_event', None) 

483 if broadcast is None: 

484 return False 

485 broadcast(event_type, data, user_id=user_id) 

486 return True 

487 except Exception as e: 

488 logger.debug("SSE broadcast skipped: %s", e) 

489 return False