Coverage for core / peer_link / message_bus.py: 93.0%

186 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2MessageBus — unified publish/subscribe across all transports. 

3 

4Every publish routes to ALL available transports simultaneously: 

5 1. LOCAL EventBus — always available, same-process delivery 

6 2. SSE — same-machine cross-process (Flask → frontend), always available 

7 3. PEERLINK — encrypted direct links to peers (when connected) 

8 4. CROSSBAR — central telemetry + legacy mobile push (when internet available) 

9 

10Works at every level: 

11 Single device offline → LOCAL + SSE 

12 Multi-device LAN → LOCAL + SSE + PEERLINK (plain, same-user) 

13 Multi-device WAN → LOCAL + SSE + PEERLINK (encrypted) + CROSSBAR 

14 Full hive → LOCAL + SSE + PEERLINK (encrypted) + CROSSBAR 

15 

16The SSE leg fan-outs to the same-machine frontend via the Flask SSE 

17broker (see ``core.platform.events.broadcast_sse_safe``). It is the 

18only delivery path that does NOT depend on Crossbar / WAMP being up 

19— so any chat upgrade, dashboard invalidate, or capability event 

20remains visible to the local UI even when the WAMP router refuses 

21connection. Without the SSE leg every caller would have to bolt on 

22``broadcast_sse_safe`` by hand and inevitably forget it (the 

23``_deliver_expert_to_user_async`` regression is exactly that 

24shape — expert reply published to WAMP only, lost when Crossbar 

25was down). 

26 

27Dedup: message_id LRU set prevents double delivery when message 

28arrives via multiple transports. 

29 

30Topic mapping: 

31 New topics use dot-notation: 'chat.response', 'task.progress' 

32 Legacy Crossbar topics: 'com.hertzai.hevolve.chat.{user_id}' 

33 Mapping is bidirectional for backward compatibility. 

34""" 

35import json 

36import logging 

37import os 

38import threading 

39import time 

40import uuid 

41from collections import OrderedDict 

42from typing import Any, Callable, Dict, List, Optional 

43 

44logger = logging.getLogger('hevolve.peer_link') 

45 

46 

47# Legacy topic mapping: new → old Crossbar topic template 

48# {user_id} is substituted at publish time from data dict 

49TOPIC_MAP = { 

50 # Per-user chat topics (frontend crossbarWorker.js subscribes to these) 

51 'chat.response': 'com.hertzai.hevolve.chat.{user_id}', 

52 'chat.action': 'com.hertzai.hevolve.action.{user_id}', 

53 'chat.general': 'com.hertzai.hevolve.{user_id}', 

54 'chat.analogy': 'com.hertzai.hevolve.analogy.{user_id}', 

55 'chat.social': 'com.hertzai.hevolve.social.{user_id}', 

56 'chat.pupit': 'com.hertzai.pupit.{user_id}', 

57 # Book parsing (percentage progress → frontend progress bar) 

58 'book.parsing': 'com.hertzai.bookparsing.{user_id}', 

59 # Task lifecycle (server-side tracking) 

60 'task.progress': 'com.hertzai.longrunning.log', 

61 'task.confirmation': 'com.hertzai.hevolve.confirmation', 

62 'task.exception': 'com.hertzai.hevolve.exception', 

63 'task.timeout': 'com.hertzai.hevolve.timeout', 

64 'task.intermediate': 'com.hertzai.hevolve.intermediate', 

65 'task.error': 'com.hertzai.hevolve.error', 

66 'task.actions': 'com.hertzai.hevolve.actions', 

67 'task.probe': 'com.hertzai.hevolve.probe', 

68 # Mobile / push 

69 'mobile.push': 'com.hertzai.hevolve.pupitpublish', 

70 # Agent coordination — per-user suffix mirrors chat/action/vision/... 

71 # convention so WAMP router ACL (#246) can gate this topic too. 

72 # (#510 follow-up — was a global topic before.) 

73 'agent.multichat': 'com.hertzai.hevolve.agent.multichat.{user_id}', 

74 # Game sessions 

75 'game.session': 'com.hertzai.hevolve.game.{session_id}', 

76 # Community 

77 'community.message': 'com.hertzai.hevolve.community.{community_id}', 

78 'community.feed': 'com.hertzai.community.feed', 

79 # Fleet commands (RN subscribes for TTS, agent consent, game dispatch) 

80 'fleet.command': 'com.hertzai.hevolve.fleet.{device_id}', 

81 # Fleet commands targeting all devices for a user (fan-out when device_id unknown) 

82 'fleet.command.user': 'com.hertzai.hevolve.fleet.user.{user_id}', 

83 # Mock interview (RN only) 

84 'mock_interview': 'com.hertzai.mock_interview.{user_id}', 

85 # Telemetry (node → central only, metadata, never content) 

86 'telemetry.node': 'com.hartos.telemetry.{node_id}', 

87 # Compute routing status (client shows real-time routing info) 

88 'compute.routing': 'com.hertzai.hevolve.compute.routing.{user_id}', 

89 # Compute relay — phone→HARTOS request + HARTOS→phone response (NAT traversal) 

90 'compute.request': 'com.hertzai.hevolve.compute.request.{user_id}', 

91 'compute.response': 'com.hertzai.hevolve.compute.response.{user_id}', 

92 # Remote desktop 

93 'remote_desktop.signal': 'com.hartos.remote_desktop.signal.{device_id}', 

94} 

95 

96# Reverse lookup: legacy topic prefix → new topic 

97# Sorted by prefix length (longest first) so 'com.hertzai.hevolve.chat' 

98# matches before the shorter 'com.hertzai.hevolve' (chat.general). 

99_REVERSE_MAP_UNSORTED = {} 

100for new_topic, legacy_template in TOPIC_MAP.items(): 

101 prefix = legacy_template.split('.{')[0] if '.{' in legacy_template else legacy_template 

102 _REVERSE_MAP_UNSORTED[prefix] = new_topic 

103_REVERSE_MAP = dict(sorted(_REVERSE_MAP_UNSORTED.items(), key=lambda x: -len(x[0]))) 

104 

105 

106def resolve_legacy_topic(legacy_topic: str): 

107 """Map a legacy Crossbar topic to a MessageBus topic + extract suffix. 

108 

109 This is the SINGLE source of truth for legacy→bus topic resolution. 

110 Consumers: hart_intelligence.publish_async(), receive_from_crossbar(). 

111 

112 Returns: 

113 (bus_topic, suffix) where suffix is typically user_id. 

114 (None, '') if no mapping found. 

115 """ 

116 for prefix, bus_topic in _REVERSE_MAP.items(): 

117 if legacy_topic == prefix: 

118 return bus_topic, '' 

119 if legacy_topic.startswith(prefix + '.'): 

120 suffix = legacy_topic[len(prefix) + 1:] 

121 return bus_topic, suffix 

122 return None, '' 

123 

124 

125def chat_topic_for(user_id: str) -> str: 

126 """Return the legacy WAMP chat topic for a given user. 

127 

128 Single source of truth for the per-user chat-bubble topic. 

129 Replaces the inline ``f'com.hertzai.hevolve.chat.{user_id}'`` 

130 pattern that was duplicated at every publish call site. 

131 

132 Output is byte-identical to the inline f-string callers were 

133 producing — every subscriber (Android RN, Web SPA, Nunba 

134 adapter) sees zero wire change. This is purely a refactor 

135 seam so the legacy topic name lives in one place the day we 

136 eventually retire it. 

137 """ 

138 return f'com.hertzai.hevolve.chat.{user_id}' 

139 

140 

141class _LRUDedup: 

142 """LRU set for message deduplication. O(1) check and insert.""" 

143 

144 def __init__(self, maxsize: int = 10000): 

145 self._cache: OrderedDict = OrderedDict() 

146 self._maxsize = maxsize 

147 self._lock = threading.Lock() 

148 

149 def check_and_add(self, msg_id: str) -> bool: 

150 """Returns True if msg_id is new (not a duplicate).""" 

151 with self._lock: 

152 if msg_id in self._cache: 

153 return False # Duplicate 

154 self._cache[msg_id] = True 

155 if len(self._cache) > self._maxsize: 

156 self._cache.popitem(last=False) 

157 return True # New 

158 

159 

160class MessageBus: 

161 """Unified pub/sub across LOCAL + PEERLINK + CROSSBAR. 

162 

163 Usage: 

164 bus = get_message_bus() 

165 bus.subscribe('chat.response', handler) 

166 bus.publish('chat.response', {'user_id': '123', 'text': 'Hello'}) 

167 """ 

168 

169 def __init__(self): 

170 self._subscriptions: Dict[str, List[Callable]] = {} 

171 self._lock = threading.Lock() 

172 self._dedup = _LRUDedup(maxsize=10000) 

173 self._http_transport: Optional[Callable] = None # injected Crossbar HTTP fallback 

174 self._stats = { 

175 'published': 0, 

176 'delivered_local': 0, 

177 'delivered_sse': 0, 

178 'delivered_peerlink': 0, 

179 'delivered_crossbar': 0, 

180 'deduplicated': 0, 

181 } 

182 

183 def set_http_transport(self, transport_fn: Callable) -> None: 

184 """Inject HTTP Crossbar transport (avoids layering violation). 

185 

186 Called by hart_intelligence at startup to provide the HTTP publish 

187 fallback without MessageBus importing from hart_intelligence. 

188 

189 Args: 

190 transport_fn: callable(topic: str, payload: str) -> None 

191 """ 

192 self._http_transport = transport_fn 

193 

194 def publish(self, topic: str, data: dict = None, 

195 user_id: str = '', device_id: str = '', 

196 skip_crossbar: bool = False, 

197 skip_peerlink: bool = False, 

198 skip_sse: bool = False) -> str: 

199 """Publish a message to all available transports. 

200 

201 Fan-out order (each leg is independent — failure in one never 

202 blocks the others): 

203 

204 1. LOCAL — same-process subscribers + EventBus (always) 

205 2. SSE — same-machine Flask → frontend, no network (always) 

206 3. PEERLINK — P2P device-to-device (BLE / local Wi-Fi) (best-effort) 

207 4. CROSSBAR — WAMP / cross-network (best-effort) 

208 

209 SSE is treated like LOCAL trust-wise (same-machine, loopback 

210 only, MCP-token gated) so payloads pass through unredacted. 

211 Outbound legs (PEERLINK + CROSSBAR) get the DLP scrub. 

212 

213 SUBTLE — Nunba's adapter does NOT see direct ``bus.publish`` 

214 calls. Nunba's ``routes/hartos_backend_adapter.py`` 

215 monkey-patches ``hart_intelligence.publish_async``, not this 

216 method. Callers that go directly through the bus 

217 (``bus.publish(...)``) bypass that monkey-patch — Nunba's 

218 per-request thinking-trace buffer never sees those messages. 

219 For chat-bubble publishes, prefer 

220 ``hart_intelligence.publish_async(chat_topic_for(user_id), 

221 json.dumps(payload))`` so the interceptor still fires. 

222 Migration to a bus subscriber on ``chat.response`` is the 

223 right long-term shape (tracked in 

224 ``memory/project_publish_aop_migration.md``). 

225 

226 Args: 

227 topic: Dot-notation topic (e.g., 'chat.response') 

228 data: Message payload (JSON-serializable dict) 

229 user_id: For per-user topic routing (substituted into legacy topics) 

230 device_id: For per-device topic routing 

231 skip_crossbar: Don't publish to Crossbar (for local-only events) 

232 skip_peerlink: Don't publish to PeerLink (for same-process events) 

233 skip_sse: Don't publish to the same-machine SSE broker 

234 (for events that should NEVER reach the local UI, e.g. 

235 pure server-to-server telemetry). Default False so 

236 every existing caller automatically gains the SSE leg. 

237 

238 Returns: 

239 Message ID (for dedup/tracking) 

240 """ 

241 data = data or {} 

242 msg_id = uuid.uuid4().hex[:16] 

243 

244 # Add metadata 

245 envelope = { 

246 'msg_id': msg_id, 

247 'topic': topic, 

248 'data': data, 

249 'timestamp': time.time(), 

250 } 

251 if user_id: 

252 envelope['user_id'] = user_id 

253 data.setdefault('user_id', user_id) 

254 

255 self._stats['published'] += 1 

256 

257 # 1. LOCAL — always deliver (unredacted, same process) 

258 self._route_local(topic, data, msg_id) 

259 

260 # 2. SSE — same-machine cross-process, unredacted (same trust as LOCAL — 

261 # loopback only, MCP-token gated, same user). Always attempted; the 

262 # helper no-ops gracefully if the SSE broker hasn't been set up yet 

263 # (e.g. tests, or pre-Flask boot). This is the leg that keeps the 

264 # local UI working when Crossbar / WAMP is down. 

265 if not skip_sse: 

266 self._route_sse(topic, data, user_id, msg_id) 

267 

268 # Redact secrets before outbound transmission (PeerLink + Crossbar) 

269 outbound_data = data 

270 if not skip_peerlink or not skip_crossbar: 

271 try: 

272 from security.dlp_engine import redact_pii 

273 import json 

274 raw = json.dumps(data) 

275 redacted = redact_pii(raw) 

276 if redacted != raw: 

277 outbound_data = json.loads(redacted) 

278 except (ImportError, Exception): 

279 pass # DLP not available — proceed unredacted 

280 

281 # 3. PEERLINK — if connected peers exist 

282 if not skip_peerlink: 

283 self._route_peerlink(topic, outbound_data, msg_id) 

284 

285 # 4. CROSSBAR — if internet available (and not skipped) 

286 if not skip_crossbar: 

287 self._route_crossbar(topic, outbound_data, user_id, device_id, msg_id) 

288 

289 return msg_id 

290 

291 def subscribe(self, topic: str, handler: Callable) -> None: 

292 """Subscribe to a topic. 

293 

294 Handler signature: handler(topic: str, data: dict) 

295 Supports wildcard: 'chat.*' matches 'chat.response', 'chat.action' 

296 """ 

297 with self._lock: 

298 if topic not in self._subscriptions: 

299 self._subscriptions[topic] = [] 

300 self._subscriptions[topic].append(handler) 

301 

302 def unsubscribe(self, topic: str, handler: Callable) -> None: 

303 with self._lock: 

304 handlers = self._subscriptions.get(topic, []) 

305 if handler in handlers: 

306 handlers.remove(handler) 

307 

308 def receive_from_peer(self, envelope: dict) -> bool: 

309 """Handle message received via PeerLink. 

310 

311 Deduplicates and delivers to local subscribers. 

312 Called by ChannelDispatcher when 'events' channel message arrives. 

313 """ 

314 msg_id = envelope.get('msg_id', '') 

315 if not msg_id: 

316 return False 

317 

318 if not self._dedup.check_and_add(msg_id): 

319 self._stats['deduplicated'] += 1 

320 return False # Already delivered via another transport 

321 

322 topic = envelope.get('topic', '') 

323 data = envelope.get('data', {}) 

324 

325 self._deliver_to_subscribers(topic, data) 

326 return True 

327 

328 def receive_from_crossbar(self, legacy_topic: str, message: Any) -> bool: 

329 """Handle message received via Crossbar (legacy path). 

330 

331 Maps legacy topic to new topic and delivers. 

332 """ 

333 # Find matching new topic 

334 new_topic = None 

335 for prefix, topic in _REVERSE_MAP.items(): 

336 if legacy_topic.startswith(prefix): 

337 new_topic = topic 

338 break 

339 

340 if not new_topic: 

341 new_topic = legacy_topic # Pass through unknown topics 

342 

343 data = message if isinstance(message, dict) else {'raw': str(message)} 

344 

345 msg_id = data.get('msg_id', '') or uuid.uuid4().hex[:16] 

346 if not self._dedup.check_and_add(msg_id): 

347 self._stats['deduplicated'] += 1 

348 return False 

349 

350 self._deliver_to_subscribers(new_topic, data) 

351 return True 

352 

353 def get_stats(self) -> dict: 

354 return dict(self._stats) 

355 

356 # ─── Internal routing ──────────────────────────────── 

357 

358 def _route_local(self, topic: str, data: dict, msg_id: str): 

359 """Deliver to local EventBus + direct subscribers.""" 

360 # Mark as seen for dedup 

361 self._dedup.check_and_add(msg_id) 

362 

363 # Direct subscribers 

364 self._deliver_to_subscribers(topic, data) 

365 

366 # Also emit to EventBus (for cross-subsystem communication) 

367 try: 

368 from core.platform.events import emit_event 

369 emit_event(f'bus.{topic}', data) 

370 except Exception: 

371 pass 

372 

373 self._stats['delivered_local'] += 1 

374 

375 def _route_sse(self, topic: str, data: dict, user_id: str, msg_id: str): 

376 """Push to the same-machine SSE broker (Flask → frontend). 

377 

378 Uses ``core.platform.events.broadcast_sse_safe`` which encapsulates 

379 the ``import __main__`` + ``sys.modules['main_module']`` fallback 

380 chain (Nunba's SSE registry lives on ``__main__``). The helper 

381 returns ``False`` if the SSE broker hasn't been wired up yet — 

382 we increment the stat only on confirmed delivery so test runs 

383 and pre-Flask boot don't inflate the counter. 

384 

385 Event type is the bus topic itself (1:1 mapping). Frontends 

386 listening for legacy event names (``'notification'``, 

387 ``'message'``, ``'capability_update'``, etc.) will keep working 

388 because those callers continue to invoke ``broadcast_sse_safe`` 

389 directly with their legacy event_type — this bus leg is purely 

390 additive for callers that didn't have an SSE leg before. 

391 

392 Best-effort — never raises. An SSE failure must not block the 

393 outbound legs (PEERLINK / CROSSBAR) or the LOCAL deliveries that 

394 already happened. 

395 """ 

396 try: 

397 from core.platform.events import broadcast_sse_safe 

398 except Exception: 

399 return # core.platform.events not importable — fail-closed silent 

400 try: 

401 delivered = broadcast_sse_safe( 

402 topic, data, user_id=(user_id or None)) 

403 except Exception as e: 

404 logger.debug(f"SSE route failed for {topic}: {e}") 

405 return 

406 if delivered: 

407 self._stats['delivered_sse'] += 1 

408 

409 def _route_peerlink(self, topic: str, data: dict, msg_id: str): 

410 """Send to connected peers via PeerLink 'events' channel.""" 

411 try: 

412 from core.peer_link.link_manager import get_link_manager 

413 mgr = get_link_manager() 

414 

415 envelope = { 

416 'msg_id': msg_id, 

417 'topic': topic, 

418 'data': data, 

419 } 

420 

421 sent = mgr.broadcast('events', envelope) 

422 if sent > 0: 

423 self._stats['delivered_peerlink'] += sent 

424 except Exception: 

425 pass # No PeerLink available — that's fine 

426 

427 def _route_crossbar(self, topic: str, data: dict, 

428 user_id: str, device_id: str, msg_id: str): 

429 """Publish to Crossbar for legacy mobile app + central telemetry.""" 

430 legacy_topic = TOPIC_MAP.get(topic) 

431 if not legacy_topic: 

432 return # No legacy mapping — skip Crossbar 

433 

434 # Substitute template variables from data dict 

435 import re as _re 

436 placeholders = _re.findall(r'\{(\w+)\}', legacy_topic) 

437 for key in placeholders: 

438 val = '' 

439 if key == 'user_id': 

440 val = user_id or data.get('user_id', '') 

441 elif key == 'device_id': 

442 val = device_id or data.get('device_id', '') 

443 else: 

444 val = data.get(key, '') 

445 if not val: 

446 return # Can't route without required variable 

447 legacy_topic = legacy_topic.replace(f'{{{key}}}', str(val)) 

448 

449 # Add msg_id for dedup 

450 if isinstance(data, dict): 

451 data = dict(data) 

452 data['msg_id'] = msg_id 

453 

454 payload = json.dumps(data, separators=(',', ':')) if isinstance(data, dict) else str(data) 

455 

456 # Try native WAMP session first (crossbar_server is optional) 

457 try: 

458 from crossbar_server import wamp_session 

459 if wamp_session: 

460 import asyncio 

461 asyncio.ensure_future(wamp_session.publish(legacy_topic, payload)) 

462 self._stats['delivered_crossbar'] += 1 

463 return 

464 except (ImportError, RuntimeError): 

465 pass 

466 

467 # HTTP bridge fallback (injected by hart_intelligence at startup) 

468 if self._http_transport: 

469 try: 

470 self._http_transport(legacy_topic, payload) 

471 self._stats['delivered_crossbar'] += 1 

472 except Exception: 

473 pass # No Crossbar available — offline mode 

474 

475 def _deliver_to_subscribers(self, topic: str, data: dict): 

476 """Deliver to matching subscribers (exact + wildcard).""" 

477 with self._lock: 

478 # Exact match 

479 handlers = list(self._subscriptions.get(topic, [])) 

480 

481 # Wildcard match (e.g., 'chat.*' matches 'chat.response') 

482 for pattern, pattern_handlers in self._subscriptions.items(): 

483 if '*' in pattern: 

484 import fnmatch 

485 if fnmatch.fnmatch(topic, pattern): 

486 handlers.extend(pattern_handlers) 

487 

488 for handler in handlers: 

489 try: 

490 handler(topic, data) 

491 except Exception as e: 

492 logger.debug(f"MessageBus subscriber error on {topic}: {e}") 

493 

494 

495# ─── Singleton ──────────────────────────────────────── 

496 

497_bus: Optional[MessageBus] = None 

498_bus_lock = threading.Lock() 

499 

500 

501def get_message_bus() -> MessageBus: 

502 """Get or create the singleton MessageBus.""" 

503 global _bus 

504 if _bus is None: 

505 with _bus_lock: 

506 if _bus is None: 

507 _bus = MessageBus() 

508 return _bus 

509 

510 

511def reset_message_bus(): 

512 """Reset singleton (testing only).""" 

513 global _bus 

514 _bus = None