Coverage for integrations / vision / vision_service.py: 66.3%

389 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2VisionService — manages the MiniCPM sidecar, WebSocket frame receiver, 

3and periodic description loop with intelligent adaptive sampling. 

4 

5Architecture: 

6 VisionService.start() 

7 +-> MiniCPM subprocess (port 9891) [sidecar] 

8 +-> WebSocket server (port 5460) [receives camera/screen frames] 

9 +-> Description loop (adaptive interval) [sends frame to MiniCPM only when scene changes] 

10 +-> FrameStore [in-process, replaces Redis] 

11 +-> Visual trigger evaluation [fires callbacks on description match] 

12""" 

13import asyncio 

14import json 

15import logging 

16import os 

17import subprocess 

18import sys 

19import threading 

20import time 

21from typing import Callable, Dict, List, Optional 

22 

23import numpy as np 

24import requests 

25from core.http_pool import pooled_get, pooled_post 

26 

27from .frame_store import FrameStore, compute_frame_difference, decode_jpeg 

28from .minicpm_installer import MiniCPMInstaller 

29from .lightweight_backend import get_vision_backend, VisionBackend 

30 

31logger = logging.getLogger('hevolve_vision') 

32 

33 

34class VisionService: 

35 """Orchestrates the vision pipeline: sidecar + frames + descriptions. 

36 

37 Intelligent sampling: only calls MiniCPM when the scene actually changes. 

38 Adaptive intervals: describes more often when active, backs off when static. 

39 Visual triggers: delegates to TriggerManager (VISUAL_MATCH / SCREEN_MATCH). 

40 """ 

41 

42 def __init__( 

43 self, 

44 minicpm_port: int = None, 

45 ws_port: int = None, 

46 description_interval: float = 4.0, 

47 max_description_interval: float = 30.0, 

48 min_scene_change: float = 0.01, 

49 frame_store: Optional[FrameStore] = None, 

50 minicpm_model_dir: Optional[str] = None, 

51 config_path: Optional[str] = None, 

52 callback_url: Optional[str] = None, 

53 trigger_manager=None, 

54 ): 

55 from core.port_registry import get_port 

56 self._minicpm_port = int(os.environ.get('HEVOLVE_MINICPM_PORT', minicpm_port or get_port('vision'))) 

57 self._ws_port = int(os.environ.get('VISION_WS_PORT', ws_port or get_port('websocket'))) 

58 self._description_interval = description_interval 

59 self._max_interval = max_description_interval 

60 self._min_scene_change = min_scene_change 

61 self.store = frame_store or FrameStore() 

62 self._installer = MiniCPMInstaller( 

63 model_dir=minicpm_model_dir or MiniCPMInstaller().model_dir 

64 ) 

65 self._config = self._load_config(config_path) 

66 # In bundled Nunba mode, use local server for action/DB callbacks 

67 from core.config_cache import is_bundled, get_db_url 

68 if not callback_url and is_bundled(): 

69 self._callback_url = get_db_url() 

70 else: 

71 self._callback_url = callback_url or self._config.get('database_url') 

72 self._trigger_manager = trigger_manager # Optional TriggerManager 

73 

74 self._minicpm_process: Optional[subprocess.Popen] = None 

75 self._ws_thread: Optional[threading.Thread] = None 

76 self._desc_thread: Optional[threading.Thread] = None 

77 self._running = False 

78 

79 # Circuit breaker for MiniCPM health 

80 self._consecutive_failures = 0 

81 self._max_failures = 5 

82 self._circuit_open = False 

83 

84 # Lightweight vision backend (auto-selected when MiniCPM unavailable) 

85 self._vision_backend: Optional[VisionBackend] = None 

86 

87 # Intelligent sampling state (per-user) 

88 self._last_described_frame: Dict[str, np.ndarray] = {} # user_id → numpy 

89 self._user_intervals: Dict[str, float] = {} # user_id → current interval 

90 self._last_describe_time: Dict[str, float] = {} # user_id → timestamp 

91 self._frames_skipped: int = 0 

92 self._frames_described: int = 0 

93 

94 # ─── Public API ─── 

95 

96 def start(self, mode: str = 'auto'): 

97 """Start the vision pipeline (non-blocking). 

98 

99 Args: 

100 mode: 'full' (MiniCPM+WS+desc), 'lite' (lightweight backend+WS+desc), 

101 'headless' (FrameStore only), 'auto' (detect from hardware tier). 

102 """ 

103 if self._running: 

104 logger.warning("VisionService already running") 

105 return 

106 

107 self._running = True 

108 

109 # Auto-detect mode from hardware tier 

110 if mode == 'auto': 

111 mode = self._detect_mode() 

112 

113 if mode == 'headless': 

114 logger.info("VisionService started in headless mode (FrameStore only)") 

115 return 

116 

117 if mode == 'lite': 

118 # Use lightweight CPU backend instead of MiniCPM sidecar 

119 self._vision_backend = get_vision_backend() 

120 if self._vision_backend.name == 'none': 

121 logger.warning("No vision backend available — headless mode") 

122 return 

123 if not self._vision_backend.start(): 

124 logger.warning(f"Failed to start {self._vision_backend.name} — headless mode") 

125 self._vision_backend = None 

126 return 

127 logger.info(f"Vision backend: {self._vision_backend.name} " 

128 f"(RAM: {self._vision_backend.ram_mb}MB)") 

129 else: 

130 # T8 / vision-unification: ONE auto-start path = get_vision_backend(). 

131 # Earlier code branched into MiniCPMInstaller.install() here, 

132 # which contradicted the comment below ("MiniCPM sidecar is NOT 

133 # auto-started") and resulted in a parallel 4GB VRAM sidecar 

134 # silently winning the fallback chain over the lightweight 

135 # Qwen 0.8B path even on 8GB cards (witnessed 2026-04-22 in 

136 # gui_app.log: "Vision: using minicpm backend" while catalog 

137 # had selected vlm-qwen08b). 

138 # 

139 # Removed the `_installer.install()` branch entirely. Users on 

140 # 12+ GB COMPUTE_HOST machines who want MiniCPM still get it via 

141 # the admin "Models" page (explicit load + lifecycle pinning); 

142 # the auto-start path is now exclusively get_vision_backend(), 

143 # which prefers Qwen 0.8B (the pinned dual-purpose draft + 

144 # captioner already running on port 8081). 

145 self._vision_backend = get_vision_backend() 

146 if self._vision_backend.name != 'none': 

147 if hasattr(self._vision_backend, 'start'): 

148 self._vision_backend.start() 

149 logger.info( 

150 f"Vision: using {self._vision_backend.name} backend " 

151 f"(requires_gpu={self._vision_backend.requires_gpu}, " 

152 f"ram_mb={self._vision_backend.ram_mb})") 

153 else: 

154 logger.warning("No vision backend available — headless mode") 

155 self._vision_backend = None 

156 self._running = False 

157 return 

158 

159 self._ws_thread = threading.Thread( 

160 target=self._run_ws_server, daemon=True, name='vision-ws', 

161 ) 

162 self._ws_thread.start() 

163 

164 self._desc_thread = threading.Thread( 

165 target=self._description_loop, daemon=True, name='vision-desc', 

166 ) 

167 self._desc_thread.start() 

168 

169 backend_name = self._vision_backend.name if self._vision_backend else 'none' 

170 logger.info(f"VisionService started (backend={backend_name}, adaptive sampling)") 

171 

172 # Sync with orchestrator catalog so it knows VLM is loaded. 

173 # Device label was inverted pre-2026-04-22: `device = 'cpu' if 

174 # self._vision_backend else 'gpu'` — when the MiniCPMBackend 

175 # wrapper IS the live backend (a GPU sidecar) we logged 'cpu', 

176 # and when no backend was set we logged 'gpu' (impossible: there 

177 # is no live model to be on a GPU). Drive device off the 

178 # backend's own `requires_gpu` flag instead. 

179 try: 

180 from integrations.service_tools.model_orchestrator import get_orchestrator 

181 if self._vision_backend is not None: 

182 device = 'gpu' if self._vision_backend.requires_gpu else 'cpu' 

183 model_name_for_catalog = self._vision_backend.name 

184 else: 

185 device = 'none' 

186 model_name_for_catalog = 'none' 

187 get_orchestrator().notify_loaded( 

188 'vlm', model_name_for_catalog, device=device, 

189 ) 

190 except Exception: 

191 pass 

192 

193 def _detect_mode(self) -> str: 

194 """Detect vision mode from hardware tier.""" 

195 try: 

196 from security.system_requirements import get_capabilities 

197 caps = get_capabilities() 

198 tier = getattr(caps, 'tier_name', '').lower() 

199 if tier == 'embedded': 

200 return 'headless' 

201 elif tier in ('observer', 'lite'): 

202 return 'lite' 

203 else: 

204 return 'full' 

205 except Exception: 

206 return 'full' 

207 

208 def _cleanup_subprocess(self): 

209 """atexit handler — ensures MiniCPM subprocess is killed on exit.""" 

210 if self._minicpm_process and self._minicpm_process.poll() is None: 

211 try: 

212 self._minicpm_process.terminate() 

213 self._minicpm_process.wait(timeout=3) 

214 except Exception: 

215 try: 

216 self._minicpm_process.kill() 

217 except Exception: 

218 pass 

219 

220 def stop(self): 

221 """Stop all vision components.""" 

222 self._running = False 

223 if self._minicpm_process: 

224 self._minicpm_process.terminate() 

225 try: 

226 self._minicpm_process.wait(timeout=5) 

227 except subprocess.TimeoutExpired: 

228 self._minicpm_process.kill() 

229 self._minicpm_process = None 

230 logger.info("MiniCPM sidecar stopped") 

231 if self._vision_backend is not None: 

232 self._vision_backend.stop() 

233 logger.info(f"Vision backend {self._vision_backend.name} stopped") 

234 self._vision_backend = None 

235 

236 # Sync with orchestrator catalog 

237 try: 

238 from integrations.service_tools.model_orchestrator import get_orchestrator 

239 get_orchestrator().notify_unloaded('vlm', 'MiniCPM-V-2') 

240 except Exception: 

241 pass 

242 logger.info( 

243 f"VisionService stopped (described={self._frames_described}, " 

244 f"skipped={self._frames_skipped})" 

245 ) 

246 

247 def get_frame(self, user_id: str) -> Optional[bytes]: 

248 """Get latest camera frame for a user.""" 

249 return self.store.get_frame(user_id) 

250 

251 def get_description(self, user_id: str) -> Optional[str]: 

252 """Get latest camera scene description for a user.""" 

253 return self.store.get_description(user_id) 

254 

255 def get_screen_description(self, user_id: str) -> Optional[str]: 

256 """Get latest screen description for a user.""" 

257 return self.store.get_screen_description(user_id) 

258 

259 def describe_screen_frame(self, user_id: str, frame_bytes: bytes) -> Optional[str]: 

260 """Describe a screen capture frame via MiniCPM and store the result. 

261 

262 Uses intelligent sampling: skips if screen hasn't changed. 

263 """ 

264 if self._circuit_open or not self._running: 

265 return None 

266 

267 # Intelligent sampling: check if screen changed 

268 if not self._should_describe(user_id, frame_bytes, channel='screen'): 

269 self._frames_skipped += 1 

270 return None 

271 

272 self.store.put_screen_frame(user_id, frame_bytes) 

273 

274 desc = self._describe_frame( 

275 user_id, frame_bytes, 

276 prompt='describe what is on the computer screen in 20 words' 

277 ) 

278 if desc: 

279 self.store.put_screen_description(user_id, desc) 

280 self._post_description_to_db( 

281 user_id, desc, label='Screen Context', 

282 zeroshot_label='Screen Reasoning' 

283 ) 

284 self._record_to_world_model(user_id, desc, 'screen', frame_bytes) 

285 self._evaluate_visual_triggers(user_id, desc, 'screen') 

286 self._frames_described += 1 

287 return desc 

288 

289 def register_visual_trigger( 

290 self, 

291 channel: str, 

292 callback: Callable[[Dict], None], 

293 conditions: Optional[List] = None, 

294 keywords: Optional[List[str]] = None, 

295 pattern: Optional[str] = None, 

296 cooldown_seconds: int = 0, 

297 name: Optional[str] = None, 

298 ): 

299 """Register a trigger that fires when a description matches conditions. 

300 

301 Delegates to TriggerManager (VISUAL_MATCH / SCREEN_MATCH types). 

302 

303 Args: 

304 channel: 'camera' or 'screen' 

305 callback: fn({'user_id': str, 'description': str, 'channel': str}) 

306 conditions: list of TriggerCondition objects (optional) 

307 keywords: list of keywords to match in description (optional) 

308 pattern: regex pattern to match description (optional) 

309 cooldown_seconds: minimum seconds between fires 

310 name: optional trigger name 

311 """ 

312 if self._trigger_manager is None: 

313 from integrations.channels.automation.triggers import TriggerManager 

314 self._trigger_manager = TriggerManager() 

315 

316 from integrations.channels.automation.triggers import TriggerType 

317 trigger_type = TriggerType.VISUAL_MATCH if channel == 'camera' else TriggerType.SCREEN_MATCH 

318 self._trigger_manager.register( 

319 trigger_type=trigger_type, 

320 callback=callback, 

321 name=name, 

322 conditions=conditions, 

323 keywords=keywords, 

324 pattern=pattern, 

325 cooldown_seconds=cooldown_seconds, 

326 ) 

327 

328 def is_running(self) -> bool: 

329 """True when the service has been started and hasn't stopped. 

330 

331 Exposed so callers (admin toggles, agent tools, approval handlers) 

332 can check state without reaching into `_running` directly — keeps 

333 the flag private to the class and gives tests a single mock target. 

334 """ 

335 return bool(self._running) 

336 

337 def get_status(self) -> Dict: 

338 """Return service status for health dashboards.""" 

339 backend_name = self._vision_backend.name if self._vision_backend else 'minicpm' 

340 minicpm_alive = (self._check_minicpm_health() 

341 if self._vision_backend is None else False) 

342 return { 

343 'running': self._running, 

344 'backend': backend_name, 

345 'minicpm_alive': minicpm_alive, 

346 'minicpm_port': self._minicpm_port, 

347 'ws_port': self._ws_port, 

348 'circuit_open': self._circuit_open, 

349 'consecutive_failures': self._consecutive_failures, 

350 'frames_described': self._frames_described, 

351 'frames_skipped': self._frames_skipped, 

352 'visual_triggers': self._trigger_manager.get_stats()['total_triggers'] if self._trigger_manager else 0, 

353 'installer': self._installer.get_status(), 

354 'store': self.store.stats(), 

355 } 

356 

357 # ─── Intelligent Sampling ─── 

358 

359 def _should_describe( 

360 self, user_id: str, frame_bytes: bytes, channel: str = 'camera' 

361 ) -> bool: 

362 """Decide whether this frame needs a new MiniCPM description. 

363 

364 Returns True if scene changed significantly since last description. 

365 Also manages per-user adaptive intervals. 

366 """ 

367 key = f"{user_id}:{channel}" 

368 current = decode_jpeg(frame_bytes) 

369 if current is None: 

370 return False 

371 

372 last = self._last_described_frame.get(key) 

373 if last is None: 

374 # First frame for this user/channel — always describe 

375 self._last_described_frame[key] = current 

376 self._user_intervals[key] = self._description_interval 

377 return True 

378 

379 # Check if enough time has passed (per-user adaptive interval) 

380 now = time.time() 

381 last_time = self._last_describe_time.get(key, 0) 

382 interval = self._user_intervals.get(key, self._description_interval) 

383 if now - last_time < interval: 

384 return False 

385 

386 # Compute frame difference 

387 diff = compute_frame_difference(last, current) 

388 

389 if diff > self._min_scene_change: 

390 # Scene changed — describe and reset interval to fast 

391 self._last_described_frame[key] = current 

392 self._user_intervals[key] = self._description_interval 

393 self._last_describe_time[key] = now 

394 return True 

395 else: 

396 # Static — back off (×1.5, capped) 

397 self._user_intervals[key] = min(interval * 1.5, self._max_interval) 

398 self._last_describe_time[key] = now 

399 return False 

400 

401 # ─── Visual Triggers ─── 

402 

403 def _evaluate_visual_triggers( 

404 self, user_id: str, description: str, channel: str 

405 ): 

406 """Evaluate registered triggers against a new description. 

407 

408 Delegates to TriggerManager — zero extra compute, piggybacks on 

409 descriptions already produced by MiniCPM. 

410 """ 

411 if self._trigger_manager is None: 

412 return 

413 

414 from integrations.channels.automation.triggers import TriggerType 

415 trigger_type = TriggerType.VISUAL_MATCH if channel == 'camera' else TriggerType.SCREEN_MATCH 

416 event_data = { 

417 'user_id': user_id, 

418 'description': description, 

419 'channel': channel, 

420 'timestamp': time.time(), 

421 } 

422 try: 

423 self._trigger_manager.evaluate(trigger_type, event_data) 

424 except Exception as e: 

425 logger.debug(f"Visual trigger evaluation error: {e}") 

426 

427 # ─── Face Signature Enrollment ─── 

428 

429 def _enroll_face_signature(self, user_id: str, frame_bytes: bytes): 

430 """Dispatch face enrollment to HevolveAI via WorldModelBridge. 

431 

432 Piggybacks on existing frame processing. Zero extra I/O. 

433 Enrolls up to 5 face samples per user, then stops. 

434 All ML (embedding extraction, matching) runs in HevolveAI. 

435 """ 

436 try: 

437 from core.resonance_profile import get_or_create_profile 

438 profile = get_or_create_profile(user_id) 

439 if profile.face_enrollment_count >= 5: 

440 return # Already enrolled enough samples 

441 from core.resonance_identifier import ResonanceIdentifier 

442 identifier = ResonanceIdentifier() 

443 if identifier.enroll_face(user_id, frame_bytes): 

444 logger.debug(f"Face enrollment dispatched to HevolveAI for user {user_id}") 

445 except ImportError: 

446 pass 

447 except Exception as e: 

448 logger.debug(f"Face enrollment skipped: {e}") 

449 

450 # ─── Sidecar Management ─── 

451 

452 def _start_minicpm(self): 

453 """Launch the MiniCPM server as a subprocess.""" 

454 model_dir = self._installer.get_model_dir() 

455 if not model_dir: 

456 logger.error("MiniCPM not installed — cannot start sidecar") 

457 return 

458 

459 # In frozen builds (cx_Freeze), sys.executable is Nunba.exe — using it 

460 # with -m would launch a full GUI instance instead of the module. 

461 # Use the bundled python interpreter from python-embed/ instead. 

462 python_exe = sys.executable 

463 if getattr(sys, 'frozen', False): 

464 app_dir = os.path.dirname(sys.executable) 

465 embed_python = os.path.join(app_dir, 'python-embed', 'python.exe') 

466 if os.path.isfile(embed_python): 

467 python_exe = embed_python 

468 else: 

469 logger.warning("python-embed/python.exe not found — minicpm sidecar may not start correctly") 

470 

471 cmd = [ 

472 python_exe, '-m', 'integrations.vision.minicpm_server', 

473 '--model_dir', model_dir, 

474 '--port', str(self._minicpm_port), 

475 ] 

476 logger.info(f"Starting MiniCPM sidecar: {' '.join(cmd)}") 

477 

478 _popen_kw = dict(stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) 

479 if sys.platform == 'win32': 

480 _popen_kw['creationflags'] = subprocess.CREATE_NO_WINDOW 

481 self._minicpm_process = subprocess.Popen(cmd, **_popen_kw) 

482 self._wait_for_minicpm(timeout=120) 

483 

484 def _wait_for_minicpm(self, timeout: float = 120): 

485 """Poll MiniCPM health endpoint until ready.""" 

486 start = time.time() 

487 while time.time() - start < timeout and self._running: 

488 if self._check_minicpm_health(): 

489 logger.info("MiniCPM sidecar is healthy") 

490 return True 

491 time.sleep(2) 

492 logger.error(f"MiniCPM sidecar not healthy after {timeout}s") 

493 return False 

494 

495 def _check_minicpm_health(self) -> bool: 

496 """Check if MiniCPM sidecar is responding.""" 

497 try: 

498 r = pooled_get( 

499 f'http://localhost:{self._minicpm_port}/status', timeout=3, 

500 ) 

501 if r.status_code == 200: 

502 self._consecutive_failures = 0 

503 self._circuit_open = False 

504 return True 

505 except requests.RequestException: 

506 pass 

507 self._consecutive_failures += 1 

508 if self._consecutive_failures >= self._max_failures: 

509 self._circuit_open = True 

510 return False 

511 

512 # ─── WebSocket Frame Receiver ─── 

513 

514 def _run_ws_server(self): 

515 """Run async WebSocket server in a new event loop.""" 

516 loop = asyncio.new_event_loop() 

517 asyncio.set_event_loop(loop) 

518 try: 

519 loop.run_until_complete(self._ws_serve()) 

520 except Exception as e: 

521 logger.error(f"WebSocket server error: {e}") 

522 finally: 

523 loop.close() 

524 

525 async def _ws_serve(self): 

526 """Start WebSocket server and handle connections. 

527 

528 Stage-C (Symptom #6, 2026-04-16) note on the WAMP rule: 

529 house-rule 6 mandates "ALL real-time push uses Crossbar WAMP". 

530 That rule applies to push-semantics messaging — chat replies, 

531 consent events, agent state, notifications. The consent event 

532 for camera / screen is now WAMP (see hart_intelligence_entry. 

533 agent_approval). Binary JPEG frame ingress at 5 FPS × ~11KB 

534 (~55 KB/s peak) is well under the 10 MB/s bandwidth ceiling, 

535 but Crossbar's HTTP-WAMP publish bridge does not natively 

536 chunk binary payloads and would require base64-encoding every 

537 frame + a subscriber that decodes — net loss in both latency 

538 and bytes-over-the-wire. Raw WebSocket on the same host is a 

539 local-loopback bulk-data channel (analogous to a Flask file 

540 upload), not a push-notification channel. Keeping it raw is 

541 the intended separation of concerns. 

542 

543 Trade-off documented in commit message of the WAMP consent 

544 publish. If a future requirement forces consolidation, the 

545 switchpoint is websocket.serve -> crossbar_pub_async with 

546 chunking on the subscriber; no other code needs to change. 

547 """ 

548 try: 

549 import websockets 

550 except ImportError: 

551 logger.error("websockets not installed — frame receiver disabled") 

552 return 

553 

554 server = await websockets.serve(self._ws_handler, '0.0.0.0', self._ws_port) 

555 # Read actual bound port (important when ws_port=0 for dynamic allocation) 

556 if server.sockets: 

557 actual_port = server.sockets[0].getsockname()[1] 

558 self._ws_port = actual_port 

559 logger.info(f"WebSocket frame receiver on port {self._ws_port}") 

560 try: 

561 while self._running: 

562 await asyncio.sleep(1) 

563 finally: 

564 server.close() 

565 await server.wait_closed() 

566 

567 async def _ws_handler(self, websocket, path=None): 

568 """Handle a single WebSocket connection (one per user). 

569 

570 Protocol: 

571 1. Client sends user_id (digit string) 

572 2. Client sends "video_start" (camera, default) or "screen_start" 

573 3. Client sends binary JPEG frames 

574 4. Client sends "video_stop" to end 

575 """ 

576 import cv2 

577 

578 user_id = None 

579 channel = 'camera' # default channel 

580 try: 

581 async for message in websocket: 

582 if isinstance(message, bytes): 

583 frame = cv2.imdecode( 

584 np.frombuffer(message, np.uint8), cv2.IMREAD_COLOR, 

585 ) 

586 if frame is not None and user_id: 

587 frame = cv2.fastNlMeansDenoisingColored( 

588 frame, None, 10, 10, 7, 21 

589 ) 

590 _, encoded = cv2.imencode('.jpg', frame) 

591 jpeg_bytes = encoded.tobytes() 

592 if channel == 'screen': 

593 self.store.put_screen_frame(user_id, jpeg_bytes) 

594 else: 

595 self.store.put_frame(user_id, jpeg_bytes) 

596 elif isinstance(message, str): 

597 if message.isdigit(): 

598 user_id = message 

599 logger.info(f"Frame session started for user {user_id}") 

600 elif message == 'screen_start': 

601 channel = 'screen' 

602 logger.info(f"User {user_id} switched to screen channel") 

603 elif message == 'video_start': 

604 channel = 'camera' 

605 elif message == 'video_stop': 

606 break 

607 except Exception as e: 

608 logger.debug(f"WebSocket session ended: {e}") 

609 finally: 

610 if user_id: 

611 logger.info(f"Frame session ended for user {user_id} ({channel})") 

612 

613 # ─── Description Loop ─── 

614 

615 def _description_loop(self): 

616 """Periodically describe frames via MiniCPM with intelligent sampling. 

617 

618 Only calls MiniCPM when frame difference exceeds threshold. 

619 Backs off interval for static scenes (4s → 8s → ... → 30s cap). 

620 Evaluates visual triggers after each new description. 

621 Processes both camera and screen channels. 

622 """ 

623 while self._running: 

624 if self._circuit_open: 

625 time.sleep(self._description_interval * 2) 

626 self._check_minicpm_health() 

627 continue 

628 

629 try: 

630 users = self.store.active_users() 

631 for user_id in users: 

632 if not self._running: 

633 return 

634 

635 # Camera channel 

636 frame_bytes = self.store.get_frame(user_id) 

637 if frame_bytes: 

638 # Piggyback face signature enrollment (zero extra I/O) 

639 self._enroll_face_signature(user_id, frame_bytes) 

640 if self._should_describe(user_id, frame_bytes, 'camera'): 

641 desc = self._describe_frame(user_id, frame_bytes) 

642 if desc: 

643 self.store.put_description(user_id, desc) 

644 self._post_description_to_db(user_id, desc) 

645 self._record_to_world_model(user_id, desc, 'camera', frame_bytes) 

646 self._save_to_memory_graph(user_id, desc, 'camera') 

647 self._emit_perception_event(user_id, desc, 'camera') 

648 self._evaluate_visual_triggers( 

649 user_id, desc, 'camera' 

650 ) 

651 self._frames_described += 1 

652 else: 

653 self._frames_skipped += 1 

654 

655 # Screen channel 

656 screen_bytes = self.store.get_screen_frame(user_id) 

657 if screen_bytes: 

658 if self._should_describe(user_id, screen_bytes, 'screen'): 

659 desc = self._describe_frame( 

660 user_id, screen_bytes, 

661 prompt='describe what is on the computer screen in 20 words', 

662 ) 

663 if desc: 

664 self.store.put_screen_description(user_id, desc) 

665 self._post_description_to_db( 

666 user_id, desc, 

667 label='Screen Context', 

668 zeroshot_label='Screen Reasoning', 

669 ) 

670 self._record_to_world_model(user_id, desc, 'screen', screen_bytes) 

671 self._save_to_memory_graph(user_id, desc, 'screen') 

672 self._emit_perception_event(user_id, desc, 'screen') 

673 self._evaluate_visual_triggers( 

674 user_id, desc, 'screen' 

675 ) 

676 self._frames_described += 1 

677 else: 

678 self._frames_skipped += 1 

679 except Exception as e: 

680 logger.debug(f"Description loop error: {e}") 

681 

682 # Check if vision backend should unload (no frames for IDLE_TIMEOUT_S) 

683 if self._vision_backend and hasattr(self._vision_backend, 'check_idle'): 

684 self._vision_backend.check_idle() 

685 

686 # Sleep the minimum user interval (so we check the fastest user on time) 

687 min_interval = self._description_interval 

688 if self._user_intervals: 

689 min_interval = min( 

690 min_interval, 

691 min(self._user_intervals.values()) 

692 ) 

693 time.sleep(min_interval) 

694 

695 def _describe_frame( 

696 self, user_id: str, frame_bytes: bytes, 

697 prompt: str = 'describe what the user is doing in 20 words', 

698 ) -> Optional[str]: 

699 """Describe a frame using the active backend (MiniCPM or lightweight).""" 

700 if self._circuit_open: 

701 return None 

702 

703 # Use lightweight backend if available 

704 if self._vision_backend is not None: 

705 try: 

706 return self._vision_backend.describe(frame_bytes) 

707 except Exception as e: 

708 logger.debug(f"Lightweight backend error: {e}") 

709 return None 

710 

711 # MiniCPM sidecar path 

712 try: 

713 r = pooled_post( 

714 f'http://localhost:{self._minicpm_port}/describe', 

715 data=frame_bytes, 

716 params={'prompt': prompt}, 

717 headers={'Content-Type': 'application/octet-stream'}, 

718 timeout=10, 

719 ) 

720 if r.status_code == 200: 

721 return r.json().get('result') 

722 except requests.RequestException: 

723 self._consecutive_failures += 1 

724 if self._consecutive_failures >= self._max_failures: 

725 self._circuit_open = True 

726 return None 

727 

728 def _post_description_to_db( 

729 self, user_id: str, description: str, 

730 label: str = 'Visual Context', 

731 zeroshot_label: str = 'Video Reasoning', 

732 ): 

733 """POST description to main server DB (/create_action pattern).""" 

734 if not self._callback_url: 

735 return 

736 try: 

737 pooled_post( 

738 f'{self._callback_url}/create_action', 

739 json={ 

740 'user_id': user_id, 

741 'conv_id': '0', 

742 'action': description[:100], 

743 'zeroshot_label': zeroshot_label, 

744 'gpt3_label': label, 

745 }, 

746 timeout=5, 

747 ) 

748 except requests.RequestException: 

749 pass 

750 

751 # ─── World Model Integration ─── 

752 

753 def _record_to_world_model(self, user_id: str, description: str, 

754 channel: str = 'camera', 

755 frame_bytes: bytes = None): 

756 """Feed scene descriptions AND raw frames to world model for learning. 

757 

758 Two data paths to HevolveAI: 

759 1. Text description -> record_interaction() (language learning) 

760 2. Raw frame bytes -> submit_sensor_frame() (visual prediction learning) 

761 Both are needed: text for semantic understanding, frames for 

762 autoregressive visual prediction error (predict next frame -> compare). 

763 """ 

764 try: 

765 from integrations.agent_engine.world_model_bridge import get_world_model_bridge 

766 bridge = get_world_model_bridge() 

767 model_id = self._vision_backend.name if self._vision_backend else 'minicpm-v2' 

768 

769 # 1. Text description (existing path) 

770 bridge.record_interaction( 

771 user_id=user_id, prompt_id=f'vision_{channel}', 

772 prompt=f'[{channel}] describe what you see', 

773 response=description, model_id=model_id) 

774 

775 # 2. Raw frame for visual encoding + autoregressive learning 

776 # Only when frame bytes available and scene changed (caller already filtered) 

777 if frame_bytes is not None: 

778 # Submit async to avoid blocking description loop 

779 self._flush_executor_submit( 

780 bridge.submit_sensor_frame, 

781 user_id, frame_bytes, channel, 

782 1.0 if channel == 'camera' else 0.0, 

783 ) 

784 except Exception: 

785 pass 

786 

787 def _flush_executor_submit(self, fn, *args): 

788 """Submit work to a background thread (reuse VisionService's thread pool).""" 

789 try: 

790 import concurrent.futures 

791 if not hasattr(self, '_frame_forward_executor'): 

792 self._frame_forward_executor = concurrent.futures.ThreadPoolExecutor( 

793 max_workers=1, thread_name_prefix='frame_fwd') 

794 self._frame_forward_executor.submit(fn, *args) 

795 except Exception: 

796 pass 

797 

798 # ─── Temporal Perception ─── 

799 

800 def _save_to_memory_graph(self, user_id: str, description: str, channel: str): 

801 """Auto-save visual description to MemoryGraph for long-term recall. 

802 

803 Singleton accessor for hart_intelligence — never eager-imports 

804 from a worker thread (vision pipeline runs in background loops 

805 that would deadlock on the canonical loader's import lock). 

806 """ 

807 try: 

808 from core.safe_hartos_attr import safe_hartos_attr 

809 _get_or_create_graph = safe_hartos_attr('_get_or_create_graph') 

810 if _get_or_create_graph is None: 

811 logger.debug( 

812 "Memory graph save skipped: HARTOS " 

813 "_get_or_create_graph unresolvable — user=%s channel=%s", 

814 user_id, channel, 

815 ) 

816 return 

817 graph = _get_or_create_graph(user_id) 

818 if graph: 

819 graph.add( 

820 content=description, 

821 metadata={'channel': channel, 'type': 'visual_context'}, 

822 tags=['visual', channel], 

823 ) 

824 logger.debug( 

825 "Memory graph save: user=%s channel=%s desc_len=%d", 

826 user_id, channel, len(description or ''), 

827 ) 

828 except Exception as e: 

829 logger.debug( 

830 "Memory graph save failed: user=%s channel=%s err=%s", 

831 user_id, channel, e, 

832 ) 

833 

834 def _emit_perception_event(self, user_id: str, description: str, channel: str): 

835 """Emit present-tense perception event on EventBus.""" 

836 try: 

837 from core.platform.events import emit_event 

838 emit_event('perception.vision.present', { 

839 'user_id': user_id, 'channel': channel, 

840 'content': description, 'timestamp': time.time(), 

841 }) 

842 except Exception: 

843 pass 

844 

845 # ─── Config ─── 

846 

847 def _load_config(self, config_path: Optional[str]) -> Dict: 

848 """Load embodied AI config if available.""" 

849 if config_path and os.path.isfile(config_path): 

850 try: 

851 with open(config_path) as f: 

852 return json.load(f) 

853 except Exception: 

854 pass 

855 

856 default = os.path.join( 

857 os.path.expanduser('~'), '.hevolve', 'embodied_ai_config.json' 

858 ) 

859 if os.path.isfile(default): 

860 try: 

861 with open(default) as f: 

862 return json.load(f) 

863 except Exception: 

864 pass 

865 

866 return {}