Coverage for core / platform / bootstrap.py: 62.3%
199 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Platform Bootstrap — One-time initialization that wires all platform services.
4Called once at server start (from hart_intelligence or tests).
5Registers core services, migrates existing shell_manifest.py panels,
6detects native apps, loads extensions, and starts lifecycle services.
8Usage:
9 from core.platform.bootstrap import bootstrap_platform
10 registry = bootstrap_platform()
11 # All services now available:
12 # registry.get('events') -> EventBus
13 # registry.get('apps') -> AppRegistry
14 # registry.get('extensions') -> ExtensionRegistry
15"""
17import logging
18import os
19import shutil
20from typing import Optional
22from core.platform.registry import ServiceRegistry, get_registry
23from core.platform.events import EventBus
24from core.platform.cache import CacheService
25from core.platform.app_manifest import AppManifest, AppType
26from core.platform.app_registry import AppRegistry
27from core.platform.extensions import ExtensionRegistry
29logger = logging.getLogger('hevolve.platform')
32def bootstrap_platform(extensions_dir: Optional[str] = None) -> ServiceRegistry:
33 """One-time platform initialization. Returns the global registry.
35 Idempotent — safe to call multiple times (no-ops if already bootstrapped).
37 Args:
38 extensions_dir: Optional path to scan for extensions.
39 Defaults to 'extensions/' relative to repo root.
41 Returns:
42 The global ServiceRegistry with all core services registered.
43 """
44 registry = get_registry()
46 # Guard against double-bootstrap
47 if registry.has('events'):
48 return registry
50 # ── Register Core Platform Services ───────────────────────
52 # EventBus — decoupled communication
53 registry.register('events', EventBus, singleton=True)
54 bus = registry.get('events')
56 # RSI-4: realtime self-improvement trigger. Binds the usage-signal
57 # tracker to EventBus topics so chat-turn / goal-failed / tool-error
58 # signals feed the autoresearch cadence axis. bind_to_bus() itself
59 # is cheap (n callbacks, no threads); the enqueue leg only fires
60 # when HEVOLVE_RSI_REALTIME=1 — so wiring it unconditionally keeps
61 # the tracker's counters populated for dashboards even when the
62 # promotion arm is off. Promoted candidates still pass RSI-1 +
63 # RSI-2 gates inside autoevolve_code_tools.commit_improvement.
64 try:
65 from integrations.agent_engine.rsi_trigger import get_rsi_trigger
66 get_rsi_trigger().bind_to_bus()
67 except Exception as e:
68 logger.debug('rsi_trigger bind skipped: %s', e)
70 # CacheService — unified in-memory + optional disk cache
71 registry.register('cache', CacheService, singleton=True)
73 # AppRegistry — central app catalog (wired to EventBus)
74 registry.register('apps', lambda: AppRegistry(event_emitter=bus.emit),
75 singleton=True)
76 apps = registry.get('apps')
78 # ExtensionRegistry — platform extension lifecycle
79 registry.register('extensions',
80 lambda: ExtensionRegistry(
81 service_registry=registry,
82 event_emitter=bus.emit),
83 singleton=True)
85 # CapabilityRouter — resolves AI capability intents to backends
86 def _make_capability_router():
87 from core.platform.ai_capabilities import CapabilityRouter
88 mr, vm = None, None
89 try:
90 from integrations.agent_engine.model_registry import model_registry
91 mr = model_registry
92 except ImportError:
93 pass
94 try:
95 from integrations.service_tools.vram_manager import vram_manager
96 vm = vram_manager
97 except ImportError:
98 pass
99 return CapabilityRouter(model_registry=mr, vram_manager=vm)
101 registry.register('capability_router', _make_capability_router, singleton=True)
103 # EnvironmentManager — agent execution environments
104 from core.platform.agent_environment import EnvironmentManager
105 registry.register('environments',
106 lambda: EnvironmentManager(
107 service_registry=registry,
108 event_emitter=bus.emit),
109 singleton=True)
111 # ── Migrate Existing Shell Manifest ───────────────────────
113 _migrate_shell_manifest(apps)
115 # ── Detect Native Apps (Rust/C++ binaries) ────────────────
117 _register_native_apps(apps)
119 # ── Load Extensions ───────────────────────────────────────
121 if extensions_dir is None:
122 # Default: extensions/ relative to repo root
123 repo_root = os.path.dirname(os.path.dirname(
124 os.path.dirname(os.path.abspath(__file__))))
125 extensions_dir = os.path.join(repo_root, 'extensions')
127 ext_reg = registry.get('extensions')
128 if os.path.isdir(extensions_dir):
129 # Verify extension signatures before loading (WS14)
130 _verify_extension_signatures(extensions_dir)
131 ext_reg.load_from_directory(extensions_dir)
133 # ── Register Orchestrator Services (lazy, fail-open) ─────
135 _register_orchestrator_services(registry)
137 # ── PeerLink — P2P communication layer ────────────────────
139 try:
140 from core.peer_link.link_manager import get_link_manager
141 from core.peer_link.telemetry import get_central_connection
142 from core.peer_link.message_bus import get_message_bus
144 registry.register('peer_link', get_link_manager, singleton=True)
145 registry.register('message_bus', get_message_bus, singleton=True)
146 registry.register('central_connection', get_central_connection, singleton=True)
148 # Start PeerLink services
149 link_mgr = registry.get('peer_link')
150 link_mgr.start()
152 central = registry.get('central_connection')
153 central.start()
155 logger.debug("PeerLink services registered")
156 except Exception as e:
157 logger.debug("PeerLink not available: %s", e)
159 # ── Connect EventBus to Crossbar WAMP (if configured) ────
161 cburl = os.environ.get('CBURL')
162 if cburl:
163 bus.connect_wamp(cburl, os.environ.get('CBREALM', 'realm1'))
165 # ── Wire EventBus subscribers for orphaned events ────────
167 _wire_event_subscribers(bus)
169 # ── Start Lifecycle Services ──────────────────────────────
171 registry.start_all()
173 total = apps.count()
174 ext_count = ext_reg.count()
175 logger.info("Platform bootstrapped: %d apps, %d extensions", total, ext_count)
177 return registry
180def _wire_event_subscribers(bus) -> None:
181 """Wire EventBus subscribers for events that have publishers but no consumers.
183 These were identified as orphaned events in the event audit.
184 Each subscriber is fail-safe — a broken handler never crashes the emitter.
185 """
187 # 1. tts.speak → dispatch to TTS engine (voice output was silently dropped)
188 def _on_tts_speak(topic, data):
189 try:
190 from integrations.channels.media.tts import TTSEngine
191 import asyncio
192 text = data.get('text', '')
193 if not text:
194 return
195 engine = TTSEngine() # defaults to Pocket TTS (offline, CPU)
196 loop = asyncio.new_event_loop()
197 try:
198 audio = loop.run_until_complete(engine.synthesize(text))
199 if audio:
200 logger.debug("TTS synthesized %d bytes for user %s",
201 len(audio), data.get('user_id', ''))
202 # Push audio to fleet command channel for mobile playback.
203 # Uses cmd_type (not 'command') to match the registry in
204 # services/fleetCommandHandler.js (COMMAND_HANDLERS).
205 try:
206 from core.peer_link.message_bus import get_message_bus
207 get_message_bus().publish('fleet.command.user', {
208 'cmd_type': 'tts_stream',
209 'text': text[:500],
210 'audio_size': len(audio),
211 }, user_id=data.get('user_id', ''))
212 except Exception:
213 pass
214 finally:
215 loop.close()
216 except Exception as e:
217 logger.debug("TTS speak handler error: %s", e)
219 bus.on('tts.speak', _on_tts_speak)
221 # 2. action.retry_exhausted → log + audit trail for failed retries
222 def _on_retry_exhausted(topic, data):
223 action_id = data.get('action_id', '')
224 max_retries = data.get('max_retries', 0)
225 logger.warning("Action %s exhausted %d retries — marking failed",
226 action_id, max_retries)
227 try:
228 from security.immutable_audit_log import get_audit_log
229 get_audit_log().log_event(
230 'action_retry_exhausted', actor_id=action_id,
231 action=f'exhausted {max_retries} retries',
232 detail={'max_retries': max_retries, 'user_id': data.get('user_id', '')})
233 except Exception:
234 pass
236 bus.on('action.retry_exhausted', _on_retry_exhausted)
238 # 3. memory.item_deleted → sync deletion to federation (item_added already syncs)
239 def _on_memory_deleted(topic, data):
240 try:
241 from integrations.agent_engine.federated_aggregator import get_federated_aggregator
242 agg = get_federated_aggregator()
243 if agg and hasattr(agg, '_event_counters'):
244 agg._event_counters['memory.item_deleted'] = (
245 agg._event_counters.get('memory.item_deleted', 0) + 1)
246 except Exception:
247 pass
249 bus.on('memory.item_deleted', _on_memory_deleted)
251 # 4. security.extension_blocked → audit log (security events must not be silent)
252 def _on_extension_blocked(topic, data):
253 module = data.get('module', 'unknown')
254 violations = data.get('violations', [])
255 logger.warning("Extension blocked: %s (violations: %s)", module, violations)
256 try:
257 from security.immutable_audit_log import get_audit_log
258 get_audit_log().log_event(
259 'extension_blocked', actor_id=module,
260 action=f'extension blocked: {module}',
261 detail={'violations': violations})
262 except Exception:
263 pass
265 bus.on('security.extension_blocked', _on_extension_blocked)
267 # 5. notification.unconfirmed → push to mobile via fleet command
268 def _on_unconfirmed(topic, data):
269 user_id = data.get('user_id', '')
270 if not user_id:
271 return
272 # Publish with cmd_type to match the RN COMMAND_HANDLERS registry.
273 # Uses fleet.command.user topic for fan-out (no specific device_id).
274 try:
275 from core.peer_link.message_bus import get_message_bus
276 get_message_bus().publish('fleet.command.user', {
277 'cmd_type': 'notification_unconfirmed',
278 'msg_id': data.get('msg_id', ''),
279 'topic': data.get('topic', ''),
280 }, user_id=user_id)
281 except Exception:
282 pass
284 bus.on('notification.unconfirmed', _on_unconfirmed)
286 # 6. dashboard invalidation → push SSE event so the React Admin UI
287 # re-fetches immediately instead of waiting for its next 5s poll.
288 # The SSE event also lets clients SKIP polling entirely between
289 # invalidations (the new AgentDashboardPage subscribes to
290 # `dashboard.invalidate` and only re-fetches on event + 30s
291 # heartbeat fallback). All of these source events already exist;
292 # this subscriber is the canonical fan-in point — adding more
293 # sources later means subscribing them to ONE of these topics,
294 # not duplicating the SSE bridge.
295 def _on_dashboard_invalidate(topic, data):
296 try:
297 from core.platform.events import broadcast_sse_safe
298 broadcast_sse_safe('dashboard.invalidate', {
299 'reason': topic,
300 'detail': data if isinstance(data, dict) else {},
301 }, user_id=None) # broadcast — admin UI is a single-user surface
302 except Exception:
303 pass
305 for _topic in ('agent_goal.changed', 'coding_goal.changed',
306 'action_state.changed', 'daemon.status.changed',
307 'inference.completed'):
308 bus.on(_topic, _on_dashboard_invalidate)
310 logger.debug("EventBus subscribers wired: tts.speak, action.retry_exhausted, "
311 "memory.item_deleted, security.extension_blocked, notification.unconfirmed, "
312 "dashboard.invalidate")
315def _migrate_shell_manifest(apps: AppRegistry) -> None:
316 """Import existing shell_manifest.py panels into AppRegistry.
318 Backward compatible — if shell_manifest.py can't be imported
319 (e.g., during isolated testing), this gracefully no-ops.
320 """
321 try:
322 from integrations.agent_engine.shell_manifest import (
323 PANEL_MANIFEST, SYSTEM_PANELS, DYNAMIC_PANELS,
324 )
325 panel_count = apps.load_panel_manifest(PANEL_MANIFEST)
326 system_count = apps.load_system_panels(SYSTEM_PANELS)
328 # Dynamic panels
329 dynamic_count = 0
330 for panel_id, panel in DYNAMIC_PANELS.items():
331 if panel_id not in [m.id for m in apps.list_all()]:
332 manifest = AppManifest(
333 id=panel_id,
334 name=panel.get('title', panel_id),
335 version='1.0.0',
336 type=AppType.DYNAMIC_PANEL.value,
337 icon=panel.get('icon', 'open_in_new'),
338 entry={'route': panel.get('route', '')},
339 group=panel.get('group', ''),
340 default_size=tuple(panel.get('default_size', [700, 500])),
341 )
342 apps.register(manifest)
343 dynamic_count += 1
345 logger.debug("Migrated %d panels, %d system, %d dynamic from shell_manifest",
346 panel_count, system_count, dynamic_count)
348 except ImportError:
349 logger.debug("shell_manifest.py not available — skipping migration")
350 except Exception as e:
351 logger.warning("Shell manifest migration failed: %s", e)
354def _register_native_apps(apps: AppRegistry) -> None:
355 """Detect and register installed native binaries.
357 Uses shutil.which() for binary detection — same pattern as
358 service_manager.py EngineService.detect().
359 Only registers apps that are actually installed on this system.
360 """
361 NATIVE_APPS = [
362 AppManifest(
363 id='nunba', name='Nunba', version='auto',
364 type=AppType.DESKTOP_APP.value, icon='hub',
365 entry={'exec': 'nunba',
366 'http': 'http://localhost:5000',
367 'backend': 'http://localhost:6777'},
368 group='System', platforms=['linux', 'windows', 'macos'],
369 permissions=['network', 'display'],
370 description='HART OS companion app — chat, communities, agents',
371 tags=['chat', 'agents', 'ui', 'nunba', 'hart'],
372 ),
373 AppManifest(
374 id='rustdesk', name='RustDesk', version='auto',
375 type=AppType.DESKTOP_APP.value, icon='desktop_windows',
376 entry={'exec': 'rustdesk', 'bridge': 'rustdesk_bridge'},
377 group='Remote', platforms=['linux', 'windows', 'macos'],
378 permissions=['network', 'display', 'input'],
379 description='Open-source remote desktop',
380 tags=['remote', 'vnc', 'desktop'],
381 ),
382 AppManifest(
383 id='sunshine', name='Sunshine', version='auto',
384 type=AppType.SERVICE.value, icon='wb_sunny',
385 entry={'exec': 'sunshine', 'bridge': 'sunshine_bridge',
386 'http': 'https://localhost:47990'},
387 group='Remote', platforms=['linux', 'windows'],
388 permissions=['network', 'display'],
389 description='GPU-accelerated game streaming host',
390 tags=['remote', 'streaming', 'gaming'],
391 ),
392 AppManifest(
393 id='moonlight', name='Moonlight', version='auto',
394 type=AppType.DESKTOP_APP.value, icon='nightlight',
395 entry={'exec': 'moonlight', 'bridge': 'sunshine_bridge'},
396 group='Remote', platforms=['linux', 'windows', 'macos'],
397 permissions=['network', 'display'],
398 description='Low-latency game streaming client',
399 tags=['remote', 'streaming', 'gaming'],
400 ),
401 ]
403 detected = 0
404 for manifest in NATIVE_APPS:
405 exec_name = manifest.entry.get('exec', '')
406 if exec_name and shutil.which(exec_name):
407 if not apps.get(manifest.id):
408 apps.register(manifest)
409 detected += 1
410 logger.debug("Detected native app: %s", manifest.name)
412 if detected:
413 logger.info("Detected %d native apps", detected)
416def _register_orchestrator_services(registry: ServiceRegistry) -> None:
417 """Register agent orchestrator services as lazy platform services.
419 These are production services (AgentDaemon, FederatedAggregator) that
420 participate in the platform lifecycle. Lazy-loaded so tests and minimal
421 environments don't pay the import cost.
422 """
423 # AgentDaemon — background goal dispatch
424 def _make_agent_daemon():
425 try:
426 from integrations.agent_engine.agent_daemon import agent_daemon
427 return agent_daemon
428 except ImportError:
429 logger.debug("AgentDaemon not available — skipping")
430 return None
432 try:
433 registry.register('agent_daemon', _make_agent_daemon, singleton=True)
434 except Exception:
435 pass
437 # FederatedAggregator — hive learning aggregation
438 def _make_federated_aggregator():
439 try:
440 from integrations.agent_engine.federated_aggregator import (
441 get_federated_aggregator,
442 )
443 return get_federated_aggregator()
444 except ImportError:
445 logger.debug("FederatedAggregator not available — skipping")
446 return None
448 try:
449 registry.register('federation', _make_federated_aggregator, singleton=True)
450 except Exception:
451 pass
453 # Remote Desktop Orchestrator — lazy startup on OS mode
454 try:
455 from integrations.remote_desktop.orchestrator import get_orchestrator
456 orchestrator = get_orchestrator()
457 registry.register('remote_desktop', lambda: orchestrator, singleton=True)
458 # Startup in background thread to avoid blocking boot
459 import threading
460 threading.Thread(
461 target=orchestrator.startup, daemon=True,
462 name='orchestrator-boot').start()
463 except ImportError:
464 logger.debug("Remote Desktop Orchestrator not available — skipping")
465 except Exception as e:
466 logger.warning("Remote Desktop Orchestrator startup: %s", e)
469def _verify_extension_signatures(extensions_dir: str) -> None:
470 """Verify extension manifest signatures before loading.
472 Logs a warning for unsigned or invalid-signed extensions.
473 Does NOT block loading — verification is advisory for now.
474 Production environments should set HART_REQUIRE_SIGNED_EXTENSIONS=1
475 to enforce signatures.
476 """
477 require_signed = os.environ.get('HART_REQUIRE_SIGNED_EXTENSIONS', '') == '1'
479 for entry in os.listdir(extensions_dir):
480 ext_path = os.path.join(extensions_dir, entry)
481 if not os.path.isdir(ext_path):
482 continue
484 manifest_path = os.path.join(ext_path, 'manifest.json')
485 sig_path = os.path.join(ext_path, 'manifest.sig')
487 if not os.path.isfile(manifest_path):
488 continue
490 if not os.path.isfile(sig_path):
491 if require_signed:
492 logger.warning(
493 "Extension '%s' has no signature — skipping (HART_REQUIRE_SIGNED_EXTENSIONS=1)",
494 entry)
495 # Remove from directory listing so load_from_directory skips it
496 try:
497 os.rename(ext_path, ext_path + '.unsigned')
498 except OSError:
499 pass
500 else:
501 logger.debug("Extension '%s' is unsigned (advisory)", entry)
502 continue
504 # Verify signature using master public key
505 try:
506 from security.master_key import verify_release
507 import json
508 with open(manifest_path, 'rb') as f:
509 manifest_bytes = f.read()
510 with open(sig_path, 'rb') as f:
511 sig_bytes = f.read()
513 if not verify_release(manifest_bytes, sig_bytes):
514 logger.warning(
515 "Extension '%s' has INVALID signature — %s",
516 entry, "skipping" if require_signed else "loading anyway (advisory)")
517 if require_signed:
518 try:
519 os.rename(ext_path, ext_path + '.badsig')
520 except OSError:
521 pass
522 except ImportError:
523 logger.debug("master_key not available — skipping signature verification")
524 except Exception as e:
525 logger.warning("Extension '%s' signature check error: %s", entry, e)