Coverage for core / peer_link / message_bus.py: 93.0%
186 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2MessageBus — unified publish/subscribe across all transports.
4Every publish routes to ALL available transports simultaneously:
5 1. LOCAL EventBus — always available, same-process delivery
6 2. SSE — same-machine cross-process (Flask → frontend), always available
7 3. PEERLINK — encrypted direct links to peers (when connected)
8 4. CROSSBAR — central telemetry + legacy mobile push (when internet available)
10Works at every level:
11 Single device offline → LOCAL + SSE
12 Multi-device LAN → LOCAL + SSE + PEERLINK (plain, same-user)
13 Multi-device WAN → LOCAL + SSE + PEERLINK (encrypted) + CROSSBAR
14 Full hive → LOCAL + SSE + PEERLINK (encrypted) + CROSSBAR
16The SSE leg fan-outs to the same-machine frontend via the Flask SSE
17broker (see ``core.platform.events.broadcast_sse_safe``). It is the
18only delivery path that does NOT depend on Crossbar / WAMP being up
19— so any chat upgrade, dashboard invalidate, or capability event
20remains visible to the local UI even when the WAMP router refuses
21connection. Without the SSE leg every caller would have to bolt on
22``broadcast_sse_safe`` by hand and inevitably forget it (the
23``_deliver_expert_to_user_async`` regression is exactly that
24shape — expert reply published to WAMP only, lost when Crossbar
25was down).
27Dedup: message_id LRU set prevents double delivery when message
28arrives via multiple transports.
30Topic mapping:
31 New topics use dot-notation: 'chat.response', 'task.progress'
32 Legacy Crossbar topics: 'com.hertzai.hevolve.chat.{user_id}'
33 Mapping is bidirectional for backward compatibility.
34"""
35import json
36import logging
37import os
38import threading
39import time
40import uuid
41from collections import OrderedDict
42from typing import Any, Callable, Dict, List, Optional
44logger = logging.getLogger('hevolve.peer_link')
47# Legacy topic mapping: new → old Crossbar topic template
48# {user_id} is substituted at publish time from data dict
49TOPIC_MAP = {
50 # Per-user chat topics (frontend crossbarWorker.js subscribes to these)
51 'chat.response': 'com.hertzai.hevolve.chat.{user_id}',
52 'chat.action': 'com.hertzai.hevolve.action.{user_id}',
53 'chat.general': 'com.hertzai.hevolve.{user_id}',
54 'chat.analogy': 'com.hertzai.hevolve.analogy.{user_id}',
55 'chat.social': 'com.hertzai.hevolve.social.{user_id}',
56 'chat.pupit': 'com.hertzai.pupit.{user_id}',
57 # Book parsing (percentage progress → frontend progress bar)
58 'book.parsing': 'com.hertzai.bookparsing.{user_id}',
59 # Task lifecycle (server-side tracking)
60 'task.progress': 'com.hertzai.longrunning.log',
61 'task.confirmation': 'com.hertzai.hevolve.confirmation',
62 'task.exception': 'com.hertzai.hevolve.exception',
63 'task.timeout': 'com.hertzai.hevolve.timeout',
64 'task.intermediate': 'com.hertzai.hevolve.intermediate',
65 'task.error': 'com.hertzai.hevolve.error',
66 'task.actions': 'com.hertzai.hevolve.actions',
67 'task.probe': 'com.hertzai.hevolve.probe',
68 # Mobile / push
69 'mobile.push': 'com.hertzai.hevolve.pupitpublish',
70 # Agent coordination — per-user suffix mirrors chat/action/vision/...
71 # convention so WAMP router ACL (#246) can gate this topic too.
72 # (#510 follow-up — was a global topic before.)
73 'agent.multichat': 'com.hertzai.hevolve.agent.multichat.{user_id}',
74 # Game sessions
75 'game.session': 'com.hertzai.hevolve.game.{session_id}',
76 # Community
77 'community.message': 'com.hertzai.hevolve.community.{community_id}',
78 'community.feed': 'com.hertzai.community.feed',
79 # Fleet commands (RN subscribes for TTS, agent consent, game dispatch)
80 'fleet.command': 'com.hertzai.hevolve.fleet.{device_id}',
81 # Fleet commands targeting all devices for a user (fan-out when device_id unknown)
82 'fleet.command.user': 'com.hertzai.hevolve.fleet.user.{user_id}',
83 # Mock interview (RN only)
84 'mock_interview': 'com.hertzai.mock_interview.{user_id}',
85 # Telemetry (node → central only, metadata, never content)
86 'telemetry.node': 'com.hartos.telemetry.{node_id}',
87 # Compute routing status (client shows real-time routing info)
88 'compute.routing': 'com.hertzai.hevolve.compute.routing.{user_id}',
89 # Compute relay — phone→HARTOS request + HARTOS→phone response (NAT traversal)
90 'compute.request': 'com.hertzai.hevolve.compute.request.{user_id}',
91 'compute.response': 'com.hertzai.hevolve.compute.response.{user_id}',
92 # Remote desktop
93 'remote_desktop.signal': 'com.hartos.remote_desktop.signal.{device_id}',
94}
96# Reverse lookup: legacy topic prefix → new topic
97# Sorted by prefix length (longest first) so 'com.hertzai.hevolve.chat'
98# matches before the shorter 'com.hertzai.hevolve' (chat.general).
99_REVERSE_MAP_UNSORTED = {}
100for new_topic, legacy_template in TOPIC_MAP.items():
101 prefix = legacy_template.split('.{')[0] if '.{' in legacy_template else legacy_template
102 _REVERSE_MAP_UNSORTED[prefix] = new_topic
103_REVERSE_MAP = dict(sorted(_REVERSE_MAP_UNSORTED.items(), key=lambda x: -len(x[0])))
106def resolve_legacy_topic(legacy_topic: str):
107 """Map a legacy Crossbar topic to a MessageBus topic + extract suffix.
109 This is the SINGLE source of truth for legacy→bus topic resolution.
110 Consumers: hart_intelligence.publish_async(), receive_from_crossbar().
112 Returns:
113 (bus_topic, suffix) where suffix is typically user_id.
114 (None, '') if no mapping found.
115 """
116 for prefix, bus_topic in _REVERSE_MAP.items():
117 if legacy_topic == prefix:
118 return bus_topic, ''
119 if legacy_topic.startswith(prefix + '.'):
120 suffix = legacy_topic[len(prefix) + 1:]
121 return bus_topic, suffix
122 return None, ''
125def chat_topic_for(user_id: str) -> str:
126 """Return the legacy WAMP chat topic for a given user.
128 Single source of truth for the per-user chat-bubble topic.
129 Replaces the inline ``f'com.hertzai.hevolve.chat.{user_id}'``
130 pattern that was duplicated at every publish call site.
132 Output is byte-identical to the inline f-string callers were
133 producing — every subscriber (Android RN, Web SPA, Nunba
134 adapter) sees zero wire change. This is purely a refactor
135 seam so the legacy topic name lives in one place the day we
136 eventually retire it.
137 """
138 return f'com.hertzai.hevolve.chat.{user_id}'
141class _LRUDedup:
142 """LRU set for message deduplication. O(1) check and insert."""
144 def __init__(self, maxsize: int = 10000):
145 self._cache: OrderedDict = OrderedDict()
146 self._maxsize = maxsize
147 self._lock = threading.Lock()
149 def check_and_add(self, msg_id: str) -> bool:
150 """Returns True if msg_id is new (not a duplicate)."""
151 with self._lock:
152 if msg_id in self._cache:
153 return False # Duplicate
154 self._cache[msg_id] = True
155 if len(self._cache) > self._maxsize:
156 self._cache.popitem(last=False)
157 return True # New
160class MessageBus:
161 """Unified pub/sub across LOCAL + PEERLINK + CROSSBAR.
163 Usage:
164 bus = get_message_bus()
165 bus.subscribe('chat.response', handler)
166 bus.publish('chat.response', {'user_id': '123', 'text': 'Hello'})
167 """
169 def __init__(self):
170 self._subscriptions: Dict[str, List[Callable]] = {}
171 self._lock = threading.Lock()
172 self._dedup = _LRUDedup(maxsize=10000)
173 self._http_transport: Optional[Callable] = None # injected Crossbar HTTP fallback
174 self._stats = {
175 'published': 0,
176 'delivered_local': 0,
177 'delivered_sse': 0,
178 'delivered_peerlink': 0,
179 'delivered_crossbar': 0,
180 'deduplicated': 0,
181 }
183 def set_http_transport(self, transport_fn: Callable) -> None:
184 """Inject HTTP Crossbar transport (avoids layering violation).
186 Called by hart_intelligence at startup to provide the HTTP publish
187 fallback without MessageBus importing from hart_intelligence.
189 Args:
190 transport_fn: callable(topic: str, payload: str) -> None
191 """
192 self._http_transport = transport_fn
194 def publish(self, topic: str, data: dict = None,
195 user_id: str = '', device_id: str = '',
196 skip_crossbar: bool = False,
197 skip_peerlink: bool = False,
198 skip_sse: bool = False) -> str:
199 """Publish a message to all available transports.
201 Fan-out order (each leg is independent — failure in one never
202 blocks the others):
204 1. LOCAL — same-process subscribers + EventBus (always)
205 2. SSE — same-machine Flask → frontend, no network (always)
206 3. PEERLINK — P2P device-to-device (BLE / local Wi-Fi) (best-effort)
207 4. CROSSBAR — WAMP / cross-network (best-effort)
209 SSE is treated like LOCAL trust-wise (same-machine, loopback
210 only, MCP-token gated) so payloads pass through unredacted.
211 Outbound legs (PEERLINK + CROSSBAR) get the DLP scrub.
213 SUBTLE — Nunba's adapter does NOT see direct ``bus.publish``
214 calls. Nunba's ``routes/hartos_backend_adapter.py``
215 monkey-patches ``hart_intelligence.publish_async``, not this
216 method. Callers that go directly through the bus
217 (``bus.publish(...)``) bypass that monkey-patch — Nunba's
218 per-request thinking-trace buffer never sees those messages.
219 For chat-bubble publishes, prefer
220 ``hart_intelligence.publish_async(chat_topic_for(user_id),
221 json.dumps(payload))`` so the interceptor still fires.
222 Migration to a bus subscriber on ``chat.response`` is the
223 right long-term shape (tracked in
224 ``memory/project_publish_aop_migration.md``).
226 Args:
227 topic: Dot-notation topic (e.g., 'chat.response')
228 data: Message payload (JSON-serializable dict)
229 user_id: For per-user topic routing (substituted into legacy topics)
230 device_id: For per-device topic routing
231 skip_crossbar: Don't publish to Crossbar (for local-only events)
232 skip_peerlink: Don't publish to PeerLink (for same-process events)
233 skip_sse: Don't publish to the same-machine SSE broker
234 (for events that should NEVER reach the local UI, e.g.
235 pure server-to-server telemetry). Default False so
236 every existing caller automatically gains the SSE leg.
238 Returns:
239 Message ID (for dedup/tracking)
240 """
241 data = data or {}
242 msg_id = uuid.uuid4().hex[:16]
244 # Add metadata
245 envelope = {
246 'msg_id': msg_id,
247 'topic': topic,
248 'data': data,
249 'timestamp': time.time(),
250 }
251 if user_id:
252 envelope['user_id'] = user_id
253 data.setdefault('user_id', user_id)
255 self._stats['published'] += 1
257 # 1. LOCAL — always deliver (unredacted, same process)
258 self._route_local(topic, data, msg_id)
260 # 2. SSE — same-machine cross-process, unredacted (same trust as LOCAL —
261 # loopback only, MCP-token gated, same user). Always attempted; the
262 # helper no-ops gracefully if the SSE broker hasn't been set up yet
263 # (e.g. tests, or pre-Flask boot). This is the leg that keeps the
264 # local UI working when Crossbar / WAMP is down.
265 if not skip_sse:
266 self._route_sse(topic, data, user_id, msg_id)
268 # Redact secrets before outbound transmission (PeerLink + Crossbar)
269 outbound_data = data
270 if not skip_peerlink or not skip_crossbar:
271 try:
272 from security.dlp_engine import redact_pii
273 import json
274 raw = json.dumps(data)
275 redacted = redact_pii(raw)
276 if redacted != raw:
277 outbound_data = json.loads(redacted)
278 except (ImportError, Exception):
279 pass # DLP not available — proceed unredacted
281 # 3. PEERLINK — if connected peers exist
282 if not skip_peerlink:
283 self._route_peerlink(topic, outbound_data, msg_id)
285 # 4. CROSSBAR — if internet available (and not skipped)
286 if not skip_crossbar:
287 self._route_crossbar(topic, outbound_data, user_id, device_id, msg_id)
289 return msg_id
291 def subscribe(self, topic: str, handler: Callable) -> None:
292 """Subscribe to a topic.
294 Handler signature: handler(topic: str, data: dict)
295 Supports wildcard: 'chat.*' matches 'chat.response', 'chat.action'
296 """
297 with self._lock:
298 if topic not in self._subscriptions:
299 self._subscriptions[topic] = []
300 self._subscriptions[topic].append(handler)
302 def unsubscribe(self, topic: str, handler: Callable) -> None:
303 with self._lock:
304 handlers = self._subscriptions.get(topic, [])
305 if handler in handlers:
306 handlers.remove(handler)
308 def receive_from_peer(self, envelope: dict) -> bool:
309 """Handle message received via PeerLink.
311 Deduplicates and delivers to local subscribers.
312 Called by ChannelDispatcher when 'events' channel message arrives.
313 """
314 msg_id = envelope.get('msg_id', '')
315 if not msg_id:
316 return False
318 if not self._dedup.check_and_add(msg_id):
319 self._stats['deduplicated'] += 1
320 return False # Already delivered via another transport
322 topic = envelope.get('topic', '')
323 data = envelope.get('data', {})
325 self._deliver_to_subscribers(topic, data)
326 return True
328 def receive_from_crossbar(self, legacy_topic: str, message: Any) -> bool:
329 """Handle message received via Crossbar (legacy path).
331 Maps legacy topic to new topic and delivers.
332 """
333 # Find matching new topic
334 new_topic = None
335 for prefix, topic in _REVERSE_MAP.items():
336 if legacy_topic.startswith(prefix):
337 new_topic = topic
338 break
340 if not new_topic:
341 new_topic = legacy_topic # Pass through unknown topics
343 data = message if isinstance(message, dict) else {'raw': str(message)}
345 msg_id = data.get('msg_id', '') or uuid.uuid4().hex[:16]
346 if not self._dedup.check_and_add(msg_id):
347 self._stats['deduplicated'] += 1
348 return False
350 self._deliver_to_subscribers(new_topic, data)
351 return True
353 def get_stats(self) -> dict:
354 return dict(self._stats)
356 # ─── Internal routing ────────────────────────────────
358 def _route_local(self, topic: str, data: dict, msg_id: str):
359 """Deliver to local EventBus + direct subscribers."""
360 # Mark as seen for dedup
361 self._dedup.check_and_add(msg_id)
363 # Direct subscribers
364 self._deliver_to_subscribers(topic, data)
366 # Also emit to EventBus (for cross-subsystem communication)
367 try:
368 from core.platform.events import emit_event
369 emit_event(f'bus.{topic}', data)
370 except Exception:
371 pass
373 self._stats['delivered_local'] += 1
375 def _route_sse(self, topic: str, data: dict, user_id: str, msg_id: str):
376 """Push to the same-machine SSE broker (Flask → frontend).
378 Uses ``core.platform.events.broadcast_sse_safe`` which encapsulates
379 the ``import __main__`` + ``sys.modules['main_module']`` fallback
380 chain (Nunba's SSE registry lives on ``__main__``). The helper
381 returns ``False`` if the SSE broker hasn't been wired up yet —
382 we increment the stat only on confirmed delivery so test runs
383 and pre-Flask boot don't inflate the counter.
385 Event type is the bus topic itself (1:1 mapping). Frontends
386 listening for legacy event names (``'notification'``,
387 ``'message'``, ``'capability_update'``, etc.) will keep working
388 because those callers continue to invoke ``broadcast_sse_safe``
389 directly with their legacy event_type — this bus leg is purely
390 additive for callers that didn't have an SSE leg before.
392 Best-effort — never raises. An SSE failure must not block the
393 outbound legs (PEERLINK / CROSSBAR) or the LOCAL deliveries that
394 already happened.
395 """
396 try:
397 from core.platform.events import broadcast_sse_safe
398 except Exception:
399 return # core.platform.events not importable — fail-closed silent
400 try:
401 delivered = broadcast_sse_safe(
402 topic, data, user_id=(user_id or None))
403 except Exception as e:
404 logger.debug(f"SSE route failed for {topic}: {e}")
405 return
406 if delivered:
407 self._stats['delivered_sse'] += 1
409 def _route_peerlink(self, topic: str, data: dict, msg_id: str):
410 """Send to connected peers via PeerLink 'events' channel."""
411 try:
412 from core.peer_link.link_manager import get_link_manager
413 mgr = get_link_manager()
415 envelope = {
416 'msg_id': msg_id,
417 'topic': topic,
418 'data': data,
419 }
421 sent = mgr.broadcast('events', envelope)
422 if sent > 0:
423 self._stats['delivered_peerlink'] += sent
424 except Exception:
425 pass # No PeerLink available — that's fine
427 def _route_crossbar(self, topic: str, data: dict,
428 user_id: str, device_id: str, msg_id: str):
429 """Publish to Crossbar for legacy mobile app + central telemetry."""
430 legacy_topic = TOPIC_MAP.get(topic)
431 if not legacy_topic:
432 return # No legacy mapping — skip Crossbar
434 # Substitute template variables from data dict
435 import re as _re
436 placeholders = _re.findall(r'\{(\w+)\}', legacy_topic)
437 for key in placeholders:
438 val = ''
439 if key == 'user_id':
440 val = user_id or data.get('user_id', '')
441 elif key == 'device_id':
442 val = device_id or data.get('device_id', '')
443 else:
444 val = data.get(key, '')
445 if not val:
446 return # Can't route without required variable
447 legacy_topic = legacy_topic.replace(f'{{{key}}}', str(val))
449 # Add msg_id for dedup
450 if isinstance(data, dict):
451 data = dict(data)
452 data['msg_id'] = msg_id
454 payload = json.dumps(data, separators=(',', ':')) if isinstance(data, dict) else str(data)
456 # Try native WAMP session first (crossbar_server is optional)
457 try:
458 from crossbar_server import wamp_session
459 if wamp_session:
460 import asyncio
461 asyncio.ensure_future(wamp_session.publish(legacy_topic, payload))
462 self._stats['delivered_crossbar'] += 1
463 return
464 except (ImportError, RuntimeError):
465 pass
467 # HTTP bridge fallback (injected by hart_intelligence at startup)
468 if self._http_transport:
469 try:
470 self._http_transport(legacy_topic, payload)
471 self._stats['delivered_crossbar'] += 1
472 except Exception:
473 pass # No Crossbar available — offline mode
475 def _deliver_to_subscribers(self, topic: str, data: dict):
476 """Deliver to matching subscribers (exact + wildcard)."""
477 with self._lock:
478 # Exact match
479 handlers = list(self._subscriptions.get(topic, []))
481 # Wildcard match (e.g., 'chat.*' matches 'chat.response')
482 for pattern, pattern_handlers in self._subscriptions.items():
483 if '*' in pattern:
484 import fnmatch
485 if fnmatch.fnmatch(topic, pattern):
486 handlers.extend(pattern_handlers)
488 for handler in handlers:
489 try:
490 handler(topic, data)
491 except Exception as e:
492 logger.debug(f"MessageBus subscriber error on {topic}: {e}")
495# ─── Singleton ────────────────────────────────────────
497_bus: Optional[MessageBus] = None
498_bus_lock = threading.Lock()
501def get_message_bus() -> MessageBus:
502 """Get or create the singleton MessageBus."""
503 global _bus
504 if _bus is None:
505 with _bus_lock:
506 if _bus is None:
507 _bus = MessageBus()
508 return _bus
511def reset_message_bus():
512 """Reset singleton (testing only)."""
513 global _bus
514 _bus = None