Coverage for core / platform / bootstrap.py: 62.3%

199 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Platform Bootstrap — One-time initialization that wires all platform services. 

3 

4Called once at server start (from hart_intelligence or tests). 

5Registers core services, migrates existing shell_manifest.py panels, 

6detects native apps, loads extensions, and starts lifecycle services. 

7 

8Usage: 

9 from core.platform.bootstrap import bootstrap_platform 

10 registry = bootstrap_platform() 

11 # All services now available: 

12 # registry.get('events') -> EventBus 

13 # registry.get('apps') -> AppRegistry 

14 # registry.get('extensions') -> ExtensionRegistry 

15""" 

16 

17import logging 

18import os 

19import shutil 

20from typing import Optional 

21 

22from core.platform.registry import ServiceRegistry, get_registry 

23from core.platform.events import EventBus 

24from core.platform.cache import CacheService 

25from core.platform.app_manifest import AppManifest, AppType 

26from core.platform.app_registry import AppRegistry 

27from core.platform.extensions import ExtensionRegistry 

28 

29logger = logging.getLogger('hevolve.platform') 

30 

31 

32def bootstrap_platform(extensions_dir: Optional[str] = None) -> ServiceRegistry: 

33 """One-time platform initialization. Returns the global registry. 

34 

35 Idempotent — safe to call multiple times (no-ops if already bootstrapped). 

36 

37 Args: 

38 extensions_dir: Optional path to scan for extensions. 

39 Defaults to 'extensions/' relative to repo root. 

40 

41 Returns: 

42 The global ServiceRegistry with all core services registered. 

43 """ 

44 registry = get_registry() 

45 

46 # Guard against double-bootstrap 

47 if registry.has('events'): 

48 return registry 

49 

50 # ── Register Core Platform Services ─────────────────────── 

51 

52 # EventBus — decoupled communication 

53 registry.register('events', EventBus, singleton=True) 

54 bus = registry.get('events') 

55 

56 # RSI-4: realtime self-improvement trigger. Binds the usage-signal 

57 # tracker to EventBus topics so chat-turn / goal-failed / tool-error 

58 # signals feed the autoresearch cadence axis. bind_to_bus() itself 

59 # is cheap (n callbacks, no threads); the enqueue leg only fires 

60 # when HEVOLVE_RSI_REALTIME=1 — so wiring it unconditionally keeps 

61 # the tracker's counters populated for dashboards even when the 

62 # promotion arm is off. Promoted candidates still pass RSI-1 + 

63 # RSI-2 gates inside autoevolve_code_tools.commit_improvement. 

64 try: 

65 from integrations.agent_engine.rsi_trigger import get_rsi_trigger 

66 get_rsi_trigger().bind_to_bus() 

67 except Exception as e: 

68 logger.debug('rsi_trigger bind skipped: %s', e) 

69 

70 # CacheService — unified in-memory + optional disk cache 

71 registry.register('cache', CacheService, singleton=True) 

72 

73 # AppRegistry — central app catalog (wired to EventBus) 

74 registry.register('apps', lambda: AppRegistry(event_emitter=bus.emit), 

75 singleton=True) 

76 apps = registry.get('apps') 

77 

78 # ExtensionRegistry — platform extension lifecycle 

79 registry.register('extensions', 

80 lambda: ExtensionRegistry( 

81 service_registry=registry, 

82 event_emitter=bus.emit), 

83 singleton=True) 

84 

85 # CapabilityRouter — resolves AI capability intents to backends 

86 def _make_capability_router(): 

87 from core.platform.ai_capabilities import CapabilityRouter 

88 mr, vm = None, None 

89 try: 

90 from integrations.agent_engine.model_registry import model_registry 

91 mr = model_registry 

92 except ImportError: 

93 pass 

94 try: 

95 from integrations.service_tools.vram_manager import vram_manager 

96 vm = vram_manager 

97 except ImportError: 

98 pass 

99 return CapabilityRouter(model_registry=mr, vram_manager=vm) 

100 

101 registry.register('capability_router', _make_capability_router, singleton=True) 

102 

103 # EnvironmentManager — agent execution environments 

104 from core.platform.agent_environment import EnvironmentManager 

105 registry.register('environments', 

106 lambda: EnvironmentManager( 

107 service_registry=registry, 

108 event_emitter=bus.emit), 

109 singleton=True) 

110 

111 # ── Migrate Existing Shell Manifest ─────────────────────── 

112 

113 _migrate_shell_manifest(apps) 

114 

115 # ── Detect Native Apps (Rust/C++ binaries) ──────────────── 

116 

117 _register_native_apps(apps) 

118 

119 # ── Load Extensions ─────────────────────────────────────── 

120 

121 if extensions_dir is None: 

122 # Default: extensions/ relative to repo root 

123 repo_root = os.path.dirname(os.path.dirname( 

124 os.path.dirname(os.path.abspath(__file__)))) 

125 extensions_dir = os.path.join(repo_root, 'extensions') 

126 

127 ext_reg = registry.get('extensions') 

128 if os.path.isdir(extensions_dir): 

129 # Verify extension signatures before loading (WS14) 

130 _verify_extension_signatures(extensions_dir) 

131 ext_reg.load_from_directory(extensions_dir) 

132 

133 # ── Register Orchestrator Services (lazy, fail-open) ───── 

134 

135 _register_orchestrator_services(registry) 

136 

137 # ── PeerLink — P2P communication layer ──────────────────── 

138 

139 try: 

140 from core.peer_link.link_manager import get_link_manager 

141 from core.peer_link.telemetry import get_central_connection 

142 from core.peer_link.message_bus import get_message_bus 

143 

144 registry.register('peer_link', get_link_manager, singleton=True) 

145 registry.register('message_bus', get_message_bus, singleton=True) 

146 registry.register('central_connection', get_central_connection, singleton=True) 

147 

148 # Start PeerLink services 

149 link_mgr = registry.get('peer_link') 

150 link_mgr.start() 

151 

152 central = registry.get('central_connection') 

153 central.start() 

154 

155 logger.debug("PeerLink services registered") 

156 except Exception as e: 

157 logger.debug("PeerLink not available: %s", e) 

158 

159 # ── Connect EventBus to Crossbar WAMP (if configured) ──── 

160 

161 cburl = os.environ.get('CBURL') 

162 if cburl: 

163 bus.connect_wamp(cburl, os.environ.get('CBREALM', 'realm1')) 

164 

165 # ── Wire EventBus subscribers for orphaned events ──────── 

166 

167 _wire_event_subscribers(bus) 

168 

169 # ── Start Lifecycle Services ────────────────────────────── 

170 

171 registry.start_all() 

172 

173 total = apps.count() 

174 ext_count = ext_reg.count() 

175 logger.info("Platform bootstrapped: %d apps, %d extensions", total, ext_count) 

176 

177 return registry 

178 

179 

180def _wire_event_subscribers(bus) -> None: 

181 """Wire EventBus subscribers for events that have publishers but no consumers. 

182 

183 These were identified as orphaned events in the event audit. 

184 Each subscriber is fail-safe — a broken handler never crashes the emitter. 

185 """ 

186 

187 # 1. tts.speak → dispatch to TTS engine (voice output was silently dropped) 

188 def _on_tts_speak(topic, data): 

189 try: 

190 from integrations.channels.media.tts import TTSEngine 

191 import asyncio 

192 text = data.get('text', '') 

193 if not text: 

194 return 

195 engine = TTSEngine() # defaults to Pocket TTS (offline, CPU) 

196 loop = asyncio.new_event_loop() 

197 try: 

198 audio = loop.run_until_complete(engine.synthesize(text)) 

199 if audio: 

200 logger.debug("TTS synthesized %d bytes for user %s", 

201 len(audio), data.get('user_id', '')) 

202 # Push audio to fleet command channel for mobile playback. 

203 # Uses cmd_type (not 'command') to match the registry in 

204 # services/fleetCommandHandler.js (COMMAND_HANDLERS). 

205 try: 

206 from core.peer_link.message_bus import get_message_bus 

207 get_message_bus().publish('fleet.command.user', { 

208 'cmd_type': 'tts_stream', 

209 'text': text[:500], 

210 'audio_size': len(audio), 

211 }, user_id=data.get('user_id', '')) 

212 except Exception: 

213 pass 

214 finally: 

215 loop.close() 

216 except Exception as e: 

217 logger.debug("TTS speak handler error: %s", e) 

218 

219 bus.on('tts.speak', _on_tts_speak) 

220 

221 # 2. action.retry_exhausted → log + audit trail for failed retries 

222 def _on_retry_exhausted(topic, data): 

223 action_id = data.get('action_id', '') 

224 max_retries = data.get('max_retries', 0) 

225 logger.warning("Action %s exhausted %d retries — marking failed", 

226 action_id, max_retries) 

227 try: 

228 from security.immutable_audit_log import get_audit_log 

229 get_audit_log().log_event( 

230 'action_retry_exhausted', actor_id=action_id, 

231 action=f'exhausted {max_retries} retries', 

232 detail={'max_retries': max_retries, 'user_id': data.get('user_id', '')}) 

233 except Exception: 

234 pass 

235 

236 bus.on('action.retry_exhausted', _on_retry_exhausted) 

237 

238 # 3. memory.item_deleted → sync deletion to federation (item_added already syncs) 

239 def _on_memory_deleted(topic, data): 

240 try: 

241 from integrations.agent_engine.federated_aggregator import get_federated_aggregator 

242 agg = get_federated_aggregator() 

243 if agg and hasattr(agg, '_event_counters'): 

244 agg._event_counters['memory.item_deleted'] = ( 

245 agg._event_counters.get('memory.item_deleted', 0) + 1) 

246 except Exception: 

247 pass 

248 

249 bus.on('memory.item_deleted', _on_memory_deleted) 

250 

251 # 4. security.extension_blocked → audit log (security events must not be silent) 

252 def _on_extension_blocked(topic, data): 

253 module = data.get('module', 'unknown') 

254 violations = data.get('violations', []) 

255 logger.warning("Extension blocked: %s (violations: %s)", module, violations) 

256 try: 

257 from security.immutable_audit_log import get_audit_log 

258 get_audit_log().log_event( 

259 'extension_blocked', actor_id=module, 

260 action=f'extension blocked: {module}', 

261 detail={'violations': violations}) 

262 except Exception: 

263 pass 

264 

265 bus.on('security.extension_blocked', _on_extension_blocked) 

266 

267 # 5. notification.unconfirmed → push to mobile via fleet command 

268 def _on_unconfirmed(topic, data): 

269 user_id = data.get('user_id', '') 

270 if not user_id: 

271 return 

272 # Publish with cmd_type to match the RN COMMAND_HANDLERS registry. 

273 # Uses fleet.command.user topic for fan-out (no specific device_id). 

274 try: 

275 from core.peer_link.message_bus import get_message_bus 

276 get_message_bus().publish('fleet.command.user', { 

277 'cmd_type': 'notification_unconfirmed', 

278 'msg_id': data.get('msg_id', ''), 

279 'topic': data.get('topic', ''), 

280 }, user_id=user_id) 

281 except Exception: 

282 pass 

283 

284 bus.on('notification.unconfirmed', _on_unconfirmed) 

285 

286 # 6. dashboard invalidation → push SSE event so the React Admin UI 

287 # re-fetches immediately instead of waiting for its next 5s poll. 

288 # The SSE event also lets clients SKIP polling entirely between 

289 # invalidations (the new AgentDashboardPage subscribes to 

290 # `dashboard.invalidate` and only re-fetches on event + 30s 

291 # heartbeat fallback). All of these source events already exist; 

292 # this subscriber is the canonical fan-in point — adding more 

293 # sources later means subscribing them to ONE of these topics, 

294 # not duplicating the SSE bridge. 

295 def _on_dashboard_invalidate(topic, data): 

296 try: 

297 from core.platform.events import broadcast_sse_safe 

298 broadcast_sse_safe('dashboard.invalidate', { 

299 'reason': topic, 

300 'detail': data if isinstance(data, dict) else {}, 

301 }, user_id=None) # broadcast — admin UI is a single-user surface 

302 except Exception: 

303 pass 

304 

305 for _topic in ('agent_goal.changed', 'coding_goal.changed', 

306 'action_state.changed', 'daemon.status.changed', 

307 'inference.completed'): 

308 bus.on(_topic, _on_dashboard_invalidate) 

309 

310 logger.debug("EventBus subscribers wired: tts.speak, action.retry_exhausted, " 

311 "memory.item_deleted, security.extension_blocked, notification.unconfirmed, " 

312 "dashboard.invalidate") 

313 

314 

315def _migrate_shell_manifest(apps: AppRegistry) -> None: 

316 """Import existing shell_manifest.py panels into AppRegistry. 

317 

318 Backward compatible — if shell_manifest.py can't be imported 

319 (e.g., during isolated testing), this gracefully no-ops. 

320 """ 

321 try: 

322 from integrations.agent_engine.shell_manifest import ( 

323 PANEL_MANIFEST, SYSTEM_PANELS, DYNAMIC_PANELS, 

324 ) 

325 panel_count = apps.load_panel_manifest(PANEL_MANIFEST) 

326 system_count = apps.load_system_panels(SYSTEM_PANELS) 

327 

328 # Dynamic panels 

329 dynamic_count = 0 

330 for panel_id, panel in DYNAMIC_PANELS.items(): 

331 if panel_id not in [m.id for m in apps.list_all()]: 

332 manifest = AppManifest( 

333 id=panel_id, 

334 name=panel.get('title', panel_id), 

335 version='1.0.0', 

336 type=AppType.DYNAMIC_PANEL.value, 

337 icon=panel.get('icon', 'open_in_new'), 

338 entry={'route': panel.get('route', '')}, 

339 group=panel.get('group', ''), 

340 default_size=tuple(panel.get('default_size', [700, 500])), 

341 ) 

342 apps.register(manifest) 

343 dynamic_count += 1 

344 

345 logger.debug("Migrated %d panels, %d system, %d dynamic from shell_manifest", 

346 panel_count, system_count, dynamic_count) 

347 

348 except ImportError: 

349 logger.debug("shell_manifest.py not available — skipping migration") 

350 except Exception as e: 

351 logger.warning("Shell manifest migration failed: %s", e) 

352 

353 

354def _register_native_apps(apps: AppRegistry) -> None: 

355 """Detect and register installed native binaries. 

356 

357 Uses shutil.which() for binary detection — same pattern as 

358 service_manager.py EngineService.detect(). 

359 Only registers apps that are actually installed on this system. 

360 """ 

361 NATIVE_APPS = [ 

362 AppManifest( 

363 id='nunba', name='Nunba', version='auto', 

364 type=AppType.DESKTOP_APP.value, icon='hub', 

365 entry={'exec': 'nunba', 

366 'http': 'http://localhost:5000', 

367 'backend': 'http://localhost:6777'}, 

368 group='System', platforms=['linux', 'windows', 'macos'], 

369 permissions=['network', 'display'], 

370 description='HART OS companion app — chat, communities, agents', 

371 tags=['chat', 'agents', 'ui', 'nunba', 'hart'], 

372 ), 

373 AppManifest( 

374 id='rustdesk', name='RustDesk', version='auto', 

375 type=AppType.DESKTOP_APP.value, icon='desktop_windows', 

376 entry={'exec': 'rustdesk', 'bridge': 'rustdesk_bridge'}, 

377 group='Remote', platforms=['linux', 'windows', 'macos'], 

378 permissions=['network', 'display', 'input'], 

379 description='Open-source remote desktop', 

380 tags=['remote', 'vnc', 'desktop'], 

381 ), 

382 AppManifest( 

383 id='sunshine', name='Sunshine', version='auto', 

384 type=AppType.SERVICE.value, icon='wb_sunny', 

385 entry={'exec': 'sunshine', 'bridge': 'sunshine_bridge', 

386 'http': 'https://localhost:47990'}, 

387 group='Remote', platforms=['linux', 'windows'], 

388 permissions=['network', 'display'], 

389 description='GPU-accelerated game streaming host', 

390 tags=['remote', 'streaming', 'gaming'], 

391 ), 

392 AppManifest( 

393 id='moonlight', name='Moonlight', version='auto', 

394 type=AppType.DESKTOP_APP.value, icon='nightlight', 

395 entry={'exec': 'moonlight', 'bridge': 'sunshine_bridge'}, 

396 group='Remote', platforms=['linux', 'windows', 'macos'], 

397 permissions=['network', 'display'], 

398 description='Low-latency game streaming client', 

399 tags=['remote', 'streaming', 'gaming'], 

400 ), 

401 ] 

402 

403 detected = 0 

404 for manifest in NATIVE_APPS: 

405 exec_name = manifest.entry.get('exec', '') 

406 if exec_name and shutil.which(exec_name): 

407 if not apps.get(manifest.id): 

408 apps.register(manifest) 

409 detected += 1 

410 logger.debug("Detected native app: %s", manifest.name) 

411 

412 if detected: 

413 logger.info("Detected %d native apps", detected) 

414 

415 

416def _register_orchestrator_services(registry: ServiceRegistry) -> None: 

417 """Register agent orchestrator services as lazy platform services. 

418 

419 These are production services (AgentDaemon, FederatedAggregator) that 

420 participate in the platform lifecycle. Lazy-loaded so tests and minimal 

421 environments don't pay the import cost. 

422 """ 

423 # AgentDaemon — background goal dispatch 

424 def _make_agent_daemon(): 

425 try: 

426 from integrations.agent_engine.agent_daemon import agent_daemon 

427 return agent_daemon 

428 except ImportError: 

429 logger.debug("AgentDaemon not available — skipping") 

430 return None 

431 

432 try: 

433 registry.register('agent_daemon', _make_agent_daemon, singleton=True) 

434 except Exception: 

435 pass 

436 

437 # FederatedAggregator — hive learning aggregation 

438 def _make_federated_aggregator(): 

439 try: 

440 from integrations.agent_engine.federated_aggregator import ( 

441 get_federated_aggregator, 

442 ) 

443 return get_federated_aggregator() 

444 except ImportError: 

445 logger.debug("FederatedAggregator not available — skipping") 

446 return None 

447 

448 try: 

449 registry.register('federation', _make_federated_aggregator, singleton=True) 

450 except Exception: 

451 pass 

452 

453 # Remote Desktop Orchestrator — lazy startup on OS mode 

454 try: 

455 from integrations.remote_desktop.orchestrator import get_orchestrator 

456 orchestrator = get_orchestrator() 

457 registry.register('remote_desktop', lambda: orchestrator, singleton=True) 

458 # Startup in background thread to avoid blocking boot 

459 import threading 

460 threading.Thread( 

461 target=orchestrator.startup, daemon=True, 

462 name='orchestrator-boot').start() 

463 except ImportError: 

464 logger.debug("Remote Desktop Orchestrator not available — skipping") 

465 except Exception as e: 

466 logger.warning("Remote Desktop Orchestrator startup: %s", e) 

467 

468 

469def _verify_extension_signatures(extensions_dir: str) -> None: 

470 """Verify extension manifest signatures before loading. 

471 

472 Logs a warning for unsigned or invalid-signed extensions. 

473 Does NOT block loading — verification is advisory for now. 

474 Production environments should set HART_REQUIRE_SIGNED_EXTENSIONS=1 

475 to enforce signatures. 

476 """ 

477 require_signed = os.environ.get('HART_REQUIRE_SIGNED_EXTENSIONS', '') == '1' 

478 

479 for entry in os.listdir(extensions_dir): 

480 ext_path = os.path.join(extensions_dir, entry) 

481 if not os.path.isdir(ext_path): 

482 continue 

483 

484 manifest_path = os.path.join(ext_path, 'manifest.json') 

485 sig_path = os.path.join(ext_path, 'manifest.sig') 

486 

487 if not os.path.isfile(manifest_path): 

488 continue 

489 

490 if not os.path.isfile(sig_path): 

491 if require_signed: 

492 logger.warning( 

493 "Extension '%s' has no signature — skipping (HART_REQUIRE_SIGNED_EXTENSIONS=1)", 

494 entry) 

495 # Remove from directory listing so load_from_directory skips it 

496 try: 

497 os.rename(ext_path, ext_path + '.unsigned') 

498 except OSError: 

499 pass 

500 else: 

501 logger.debug("Extension '%s' is unsigned (advisory)", entry) 

502 continue 

503 

504 # Verify signature using master public key 

505 try: 

506 from security.master_key import verify_release 

507 import json 

508 with open(manifest_path, 'rb') as f: 

509 manifest_bytes = f.read() 

510 with open(sig_path, 'rb') as f: 

511 sig_bytes = f.read() 

512 

513 if not verify_release(manifest_bytes, sig_bytes): 

514 logger.warning( 

515 "Extension '%s' has INVALID signature — %s", 

516 entry, "skipping" if require_signed else "loading anyway (advisory)") 

517 if require_signed: 

518 try: 

519 os.rename(ext_path, ext_path + '.badsig') 

520 except OSError: 

521 pass 

522 except ImportError: 

523 logger.debug("master_key not available — skipping signature verification") 

524 except Exception as e: 

525 logger.warning("Extension '%s' signature check error: %s", entry, e)