Coverage for integrations / vision / vision_service.py: 66.3%
389 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2VisionService — manages the MiniCPM sidecar, WebSocket frame receiver,
3and periodic description loop with intelligent adaptive sampling.
5Architecture:
6 VisionService.start()
7 +-> MiniCPM subprocess (port 9891) [sidecar]
8 +-> WebSocket server (port 5460) [receives camera/screen frames]
9 +-> Description loop (adaptive interval) [sends frame to MiniCPM only when scene changes]
10 +-> FrameStore [in-process, replaces Redis]
11 +-> Visual trigger evaluation [fires callbacks on description match]
12"""
13import asyncio
14import json
15import logging
16import os
17import subprocess
18import sys
19import threading
20import time
21from typing import Callable, Dict, List, Optional
23import numpy as np
24import requests
25from core.http_pool import pooled_get, pooled_post
27from .frame_store import FrameStore, compute_frame_difference, decode_jpeg
28from .minicpm_installer import MiniCPMInstaller
29from .lightweight_backend import get_vision_backend, VisionBackend
31logger = logging.getLogger('hevolve_vision')
34class VisionService:
35 """Orchestrates the vision pipeline: sidecar + frames + descriptions.
37 Intelligent sampling: only calls MiniCPM when the scene actually changes.
38 Adaptive intervals: describes more often when active, backs off when static.
39 Visual triggers: delegates to TriggerManager (VISUAL_MATCH / SCREEN_MATCH).
40 """
42 def __init__(
43 self,
44 minicpm_port: int = None,
45 ws_port: int = None,
46 description_interval: float = 4.0,
47 max_description_interval: float = 30.0,
48 min_scene_change: float = 0.01,
49 frame_store: Optional[FrameStore] = None,
50 minicpm_model_dir: Optional[str] = None,
51 config_path: Optional[str] = None,
52 callback_url: Optional[str] = None,
53 trigger_manager=None,
54 ):
55 from core.port_registry import get_port
56 self._minicpm_port = int(os.environ.get('HEVOLVE_MINICPM_PORT', minicpm_port or get_port('vision')))
57 self._ws_port = int(os.environ.get('VISION_WS_PORT', ws_port or get_port('websocket')))
58 self._description_interval = description_interval
59 self._max_interval = max_description_interval
60 self._min_scene_change = min_scene_change
61 self.store = frame_store or FrameStore()
62 self._installer = MiniCPMInstaller(
63 model_dir=minicpm_model_dir or MiniCPMInstaller().model_dir
64 )
65 self._config = self._load_config(config_path)
66 # In bundled Nunba mode, use local server for action/DB callbacks
67 from core.config_cache import is_bundled, get_db_url
68 if not callback_url and is_bundled():
69 self._callback_url = get_db_url()
70 else:
71 self._callback_url = callback_url or self._config.get('database_url')
72 self._trigger_manager = trigger_manager # Optional TriggerManager
74 self._minicpm_process: Optional[subprocess.Popen] = None
75 self._ws_thread: Optional[threading.Thread] = None
76 self._desc_thread: Optional[threading.Thread] = None
77 self._running = False
79 # Circuit breaker for MiniCPM health
80 self._consecutive_failures = 0
81 self._max_failures = 5
82 self._circuit_open = False
84 # Lightweight vision backend (auto-selected when MiniCPM unavailable)
85 self._vision_backend: Optional[VisionBackend] = None
87 # Intelligent sampling state (per-user)
88 self._last_described_frame: Dict[str, np.ndarray] = {} # user_id → numpy
89 self._user_intervals: Dict[str, float] = {} # user_id → current interval
90 self._last_describe_time: Dict[str, float] = {} # user_id → timestamp
91 self._frames_skipped: int = 0
92 self._frames_described: int = 0
94 # ─── Public API ───
96 def start(self, mode: str = 'auto'):
97 """Start the vision pipeline (non-blocking).
99 Args:
100 mode: 'full' (MiniCPM+WS+desc), 'lite' (lightweight backend+WS+desc),
101 'headless' (FrameStore only), 'auto' (detect from hardware tier).
102 """
103 if self._running:
104 logger.warning("VisionService already running")
105 return
107 self._running = True
109 # Auto-detect mode from hardware tier
110 if mode == 'auto':
111 mode = self._detect_mode()
113 if mode == 'headless':
114 logger.info("VisionService started in headless mode (FrameStore only)")
115 return
117 if mode == 'lite':
118 # Use lightweight CPU backend instead of MiniCPM sidecar
119 self._vision_backend = get_vision_backend()
120 if self._vision_backend.name == 'none':
121 logger.warning("No vision backend available — headless mode")
122 return
123 if not self._vision_backend.start():
124 logger.warning(f"Failed to start {self._vision_backend.name} — headless mode")
125 self._vision_backend = None
126 return
127 logger.info(f"Vision backend: {self._vision_backend.name} "
128 f"(RAM: {self._vision_backend.ram_mb}MB)")
129 else:
130 # T8 / vision-unification: ONE auto-start path = get_vision_backend().
131 # Earlier code branched into MiniCPMInstaller.install() here,
132 # which contradicted the comment below ("MiniCPM sidecar is NOT
133 # auto-started") and resulted in a parallel 4GB VRAM sidecar
134 # silently winning the fallback chain over the lightweight
135 # Qwen 0.8B path even on 8GB cards (witnessed 2026-04-22 in
136 # gui_app.log: "Vision: using minicpm backend" while catalog
137 # had selected vlm-qwen08b).
138 #
139 # Removed the `_installer.install()` branch entirely. Users on
140 # 12+ GB COMPUTE_HOST machines who want MiniCPM still get it via
141 # the admin "Models" page (explicit load + lifecycle pinning);
142 # the auto-start path is now exclusively get_vision_backend(),
143 # which prefers Qwen 0.8B (the pinned dual-purpose draft +
144 # captioner already running on port 8081).
145 self._vision_backend = get_vision_backend()
146 if self._vision_backend.name != 'none':
147 if hasattr(self._vision_backend, 'start'):
148 self._vision_backend.start()
149 logger.info(
150 f"Vision: using {self._vision_backend.name} backend "
151 f"(requires_gpu={self._vision_backend.requires_gpu}, "
152 f"ram_mb={self._vision_backend.ram_mb})")
153 else:
154 logger.warning("No vision backend available — headless mode")
155 self._vision_backend = None
156 self._running = False
157 return
159 self._ws_thread = threading.Thread(
160 target=self._run_ws_server, daemon=True, name='vision-ws',
161 )
162 self._ws_thread.start()
164 self._desc_thread = threading.Thread(
165 target=self._description_loop, daemon=True, name='vision-desc',
166 )
167 self._desc_thread.start()
169 backend_name = self._vision_backend.name if self._vision_backend else 'none'
170 logger.info(f"VisionService started (backend={backend_name}, adaptive sampling)")
172 # Sync with orchestrator catalog so it knows VLM is loaded.
173 # Device label was inverted pre-2026-04-22: `device = 'cpu' if
174 # self._vision_backend else 'gpu'` — when the MiniCPMBackend
175 # wrapper IS the live backend (a GPU sidecar) we logged 'cpu',
176 # and when no backend was set we logged 'gpu' (impossible: there
177 # is no live model to be on a GPU). Drive device off the
178 # backend's own `requires_gpu` flag instead.
179 try:
180 from integrations.service_tools.model_orchestrator import get_orchestrator
181 if self._vision_backend is not None:
182 device = 'gpu' if self._vision_backend.requires_gpu else 'cpu'
183 model_name_for_catalog = self._vision_backend.name
184 else:
185 device = 'none'
186 model_name_for_catalog = 'none'
187 get_orchestrator().notify_loaded(
188 'vlm', model_name_for_catalog, device=device,
189 )
190 except Exception:
191 pass
193 def _detect_mode(self) -> str:
194 """Detect vision mode from hardware tier."""
195 try:
196 from security.system_requirements import get_capabilities
197 caps = get_capabilities()
198 tier = getattr(caps, 'tier_name', '').lower()
199 if tier == 'embedded':
200 return 'headless'
201 elif tier in ('observer', 'lite'):
202 return 'lite'
203 else:
204 return 'full'
205 except Exception:
206 return 'full'
208 def _cleanup_subprocess(self):
209 """atexit handler — ensures MiniCPM subprocess is killed on exit."""
210 if self._minicpm_process and self._minicpm_process.poll() is None:
211 try:
212 self._minicpm_process.terminate()
213 self._minicpm_process.wait(timeout=3)
214 except Exception:
215 try:
216 self._minicpm_process.kill()
217 except Exception:
218 pass
220 def stop(self):
221 """Stop all vision components."""
222 self._running = False
223 if self._minicpm_process:
224 self._minicpm_process.terminate()
225 try:
226 self._minicpm_process.wait(timeout=5)
227 except subprocess.TimeoutExpired:
228 self._minicpm_process.kill()
229 self._minicpm_process = None
230 logger.info("MiniCPM sidecar stopped")
231 if self._vision_backend is not None:
232 self._vision_backend.stop()
233 logger.info(f"Vision backend {self._vision_backend.name} stopped")
234 self._vision_backend = None
236 # Sync with orchestrator catalog
237 try:
238 from integrations.service_tools.model_orchestrator import get_orchestrator
239 get_orchestrator().notify_unloaded('vlm', 'MiniCPM-V-2')
240 except Exception:
241 pass
242 logger.info(
243 f"VisionService stopped (described={self._frames_described}, "
244 f"skipped={self._frames_skipped})"
245 )
247 def get_frame(self, user_id: str) -> Optional[bytes]:
248 """Get latest camera frame for a user."""
249 return self.store.get_frame(user_id)
251 def get_description(self, user_id: str) -> Optional[str]:
252 """Get latest camera scene description for a user."""
253 return self.store.get_description(user_id)
255 def get_screen_description(self, user_id: str) -> Optional[str]:
256 """Get latest screen description for a user."""
257 return self.store.get_screen_description(user_id)
259 def describe_screen_frame(self, user_id: str, frame_bytes: bytes) -> Optional[str]:
260 """Describe a screen capture frame via MiniCPM and store the result.
262 Uses intelligent sampling: skips if screen hasn't changed.
263 """
264 if self._circuit_open or not self._running:
265 return None
267 # Intelligent sampling: check if screen changed
268 if not self._should_describe(user_id, frame_bytes, channel='screen'):
269 self._frames_skipped += 1
270 return None
272 self.store.put_screen_frame(user_id, frame_bytes)
274 desc = self._describe_frame(
275 user_id, frame_bytes,
276 prompt='describe what is on the computer screen in 20 words'
277 )
278 if desc:
279 self.store.put_screen_description(user_id, desc)
280 self._post_description_to_db(
281 user_id, desc, label='Screen Context',
282 zeroshot_label='Screen Reasoning'
283 )
284 self._record_to_world_model(user_id, desc, 'screen', frame_bytes)
285 self._evaluate_visual_triggers(user_id, desc, 'screen')
286 self._frames_described += 1
287 return desc
289 def register_visual_trigger(
290 self,
291 channel: str,
292 callback: Callable[[Dict], None],
293 conditions: Optional[List] = None,
294 keywords: Optional[List[str]] = None,
295 pattern: Optional[str] = None,
296 cooldown_seconds: int = 0,
297 name: Optional[str] = None,
298 ):
299 """Register a trigger that fires when a description matches conditions.
301 Delegates to TriggerManager (VISUAL_MATCH / SCREEN_MATCH types).
303 Args:
304 channel: 'camera' or 'screen'
305 callback: fn({'user_id': str, 'description': str, 'channel': str})
306 conditions: list of TriggerCondition objects (optional)
307 keywords: list of keywords to match in description (optional)
308 pattern: regex pattern to match description (optional)
309 cooldown_seconds: minimum seconds between fires
310 name: optional trigger name
311 """
312 if self._trigger_manager is None:
313 from integrations.channels.automation.triggers import TriggerManager
314 self._trigger_manager = TriggerManager()
316 from integrations.channels.automation.triggers import TriggerType
317 trigger_type = TriggerType.VISUAL_MATCH if channel == 'camera' else TriggerType.SCREEN_MATCH
318 self._trigger_manager.register(
319 trigger_type=trigger_type,
320 callback=callback,
321 name=name,
322 conditions=conditions,
323 keywords=keywords,
324 pattern=pattern,
325 cooldown_seconds=cooldown_seconds,
326 )
328 def is_running(self) -> bool:
329 """True when the service has been started and hasn't stopped.
331 Exposed so callers (admin toggles, agent tools, approval handlers)
332 can check state without reaching into `_running` directly — keeps
333 the flag private to the class and gives tests a single mock target.
334 """
335 return bool(self._running)
337 def get_status(self) -> Dict:
338 """Return service status for health dashboards."""
339 backend_name = self._vision_backend.name if self._vision_backend else 'minicpm'
340 minicpm_alive = (self._check_minicpm_health()
341 if self._vision_backend is None else False)
342 return {
343 'running': self._running,
344 'backend': backend_name,
345 'minicpm_alive': minicpm_alive,
346 'minicpm_port': self._minicpm_port,
347 'ws_port': self._ws_port,
348 'circuit_open': self._circuit_open,
349 'consecutive_failures': self._consecutive_failures,
350 'frames_described': self._frames_described,
351 'frames_skipped': self._frames_skipped,
352 'visual_triggers': self._trigger_manager.get_stats()['total_triggers'] if self._trigger_manager else 0,
353 'installer': self._installer.get_status(),
354 'store': self.store.stats(),
355 }
357 # ─── Intelligent Sampling ───
359 def _should_describe(
360 self, user_id: str, frame_bytes: bytes, channel: str = 'camera'
361 ) -> bool:
362 """Decide whether this frame needs a new MiniCPM description.
364 Returns True if scene changed significantly since last description.
365 Also manages per-user adaptive intervals.
366 """
367 key = f"{user_id}:{channel}"
368 current = decode_jpeg(frame_bytes)
369 if current is None:
370 return False
372 last = self._last_described_frame.get(key)
373 if last is None:
374 # First frame for this user/channel — always describe
375 self._last_described_frame[key] = current
376 self._user_intervals[key] = self._description_interval
377 return True
379 # Check if enough time has passed (per-user adaptive interval)
380 now = time.time()
381 last_time = self._last_describe_time.get(key, 0)
382 interval = self._user_intervals.get(key, self._description_interval)
383 if now - last_time < interval:
384 return False
386 # Compute frame difference
387 diff = compute_frame_difference(last, current)
389 if diff > self._min_scene_change:
390 # Scene changed — describe and reset interval to fast
391 self._last_described_frame[key] = current
392 self._user_intervals[key] = self._description_interval
393 self._last_describe_time[key] = now
394 return True
395 else:
396 # Static — back off (×1.5, capped)
397 self._user_intervals[key] = min(interval * 1.5, self._max_interval)
398 self._last_describe_time[key] = now
399 return False
401 # ─── Visual Triggers ───
403 def _evaluate_visual_triggers(
404 self, user_id: str, description: str, channel: str
405 ):
406 """Evaluate registered triggers against a new description.
408 Delegates to TriggerManager — zero extra compute, piggybacks on
409 descriptions already produced by MiniCPM.
410 """
411 if self._trigger_manager is None:
412 return
414 from integrations.channels.automation.triggers import TriggerType
415 trigger_type = TriggerType.VISUAL_MATCH if channel == 'camera' else TriggerType.SCREEN_MATCH
416 event_data = {
417 'user_id': user_id,
418 'description': description,
419 'channel': channel,
420 'timestamp': time.time(),
421 }
422 try:
423 self._trigger_manager.evaluate(trigger_type, event_data)
424 except Exception as e:
425 logger.debug(f"Visual trigger evaluation error: {e}")
427 # ─── Face Signature Enrollment ───
429 def _enroll_face_signature(self, user_id: str, frame_bytes: bytes):
430 """Dispatch face enrollment to HevolveAI via WorldModelBridge.
432 Piggybacks on existing frame processing. Zero extra I/O.
433 Enrolls up to 5 face samples per user, then stops.
434 All ML (embedding extraction, matching) runs in HevolveAI.
435 """
436 try:
437 from core.resonance_profile import get_or_create_profile
438 profile = get_or_create_profile(user_id)
439 if profile.face_enrollment_count >= 5:
440 return # Already enrolled enough samples
441 from core.resonance_identifier import ResonanceIdentifier
442 identifier = ResonanceIdentifier()
443 if identifier.enroll_face(user_id, frame_bytes):
444 logger.debug(f"Face enrollment dispatched to HevolveAI for user {user_id}")
445 except ImportError:
446 pass
447 except Exception as e:
448 logger.debug(f"Face enrollment skipped: {e}")
450 # ─── Sidecar Management ───
452 def _start_minicpm(self):
453 """Launch the MiniCPM server as a subprocess."""
454 model_dir = self._installer.get_model_dir()
455 if not model_dir:
456 logger.error("MiniCPM not installed — cannot start sidecar")
457 return
459 # In frozen builds (cx_Freeze), sys.executable is Nunba.exe — using it
460 # with -m would launch a full GUI instance instead of the module.
461 # Use the bundled python interpreter from python-embed/ instead.
462 python_exe = sys.executable
463 if getattr(sys, 'frozen', False):
464 app_dir = os.path.dirname(sys.executable)
465 embed_python = os.path.join(app_dir, 'python-embed', 'python.exe')
466 if os.path.isfile(embed_python):
467 python_exe = embed_python
468 else:
469 logger.warning("python-embed/python.exe not found — minicpm sidecar may not start correctly")
471 cmd = [
472 python_exe, '-m', 'integrations.vision.minicpm_server',
473 '--model_dir', model_dir,
474 '--port', str(self._minicpm_port),
475 ]
476 logger.info(f"Starting MiniCPM sidecar: {' '.join(cmd)}")
478 _popen_kw = dict(stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
479 if sys.platform == 'win32':
480 _popen_kw['creationflags'] = subprocess.CREATE_NO_WINDOW
481 self._minicpm_process = subprocess.Popen(cmd, **_popen_kw)
482 self._wait_for_minicpm(timeout=120)
484 def _wait_for_minicpm(self, timeout: float = 120):
485 """Poll MiniCPM health endpoint until ready."""
486 start = time.time()
487 while time.time() - start < timeout and self._running:
488 if self._check_minicpm_health():
489 logger.info("MiniCPM sidecar is healthy")
490 return True
491 time.sleep(2)
492 logger.error(f"MiniCPM sidecar not healthy after {timeout}s")
493 return False
495 def _check_minicpm_health(self) -> bool:
496 """Check if MiniCPM sidecar is responding."""
497 try:
498 r = pooled_get(
499 f'http://localhost:{self._minicpm_port}/status', timeout=3,
500 )
501 if r.status_code == 200:
502 self._consecutive_failures = 0
503 self._circuit_open = False
504 return True
505 except requests.RequestException:
506 pass
507 self._consecutive_failures += 1
508 if self._consecutive_failures >= self._max_failures:
509 self._circuit_open = True
510 return False
512 # ─── WebSocket Frame Receiver ───
514 def _run_ws_server(self):
515 """Run async WebSocket server in a new event loop."""
516 loop = asyncio.new_event_loop()
517 asyncio.set_event_loop(loop)
518 try:
519 loop.run_until_complete(self._ws_serve())
520 except Exception as e:
521 logger.error(f"WebSocket server error: {e}")
522 finally:
523 loop.close()
525 async def _ws_serve(self):
526 """Start WebSocket server and handle connections.
528 Stage-C (Symptom #6, 2026-04-16) note on the WAMP rule:
529 house-rule 6 mandates "ALL real-time push uses Crossbar WAMP".
530 That rule applies to push-semantics messaging — chat replies,
531 consent events, agent state, notifications. The consent event
532 for camera / screen is now WAMP (see hart_intelligence_entry.
533 agent_approval). Binary JPEG frame ingress at 5 FPS × ~11KB
534 (~55 KB/s peak) is well under the 10 MB/s bandwidth ceiling,
535 but Crossbar's HTTP-WAMP publish bridge does not natively
536 chunk binary payloads and would require base64-encoding every
537 frame + a subscriber that decodes — net loss in both latency
538 and bytes-over-the-wire. Raw WebSocket on the same host is a
539 local-loopback bulk-data channel (analogous to a Flask file
540 upload), not a push-notification channel. Keeping it raw is
541 the intended separation of concerns.
543 Trade-off documented in commit message of the WAMP consent
544 publish. If a future requirement forces consolidation, the
545 switchpoint is websocket.serve -> crossbar_pub_async with
546 chunking on the subscriber; no other code needs to change.
547 """
548 try:
549 import websockets
550 except ImportError:
551 logger.error("websockets not installed — frame receiver disabled")
552 return
554 server = await websockets.serve(self._ws_handler, '0.0.0.0', self._ws_port)
555 # Read actual bound port (important when ws_port=0 for dynamic allocation)
556 if server.sockets:
557 actual_port = server.sockets[0].getsockname()[1]
558 self._ws_port = actual_port
559 logger.info(f"WebSocket frame receiver on port {self._ws_port}")
560 try:
561 while self._running:
562 await asyncio.sleep(1)
563 finally:
564 server.close()
565 await server.wait_closed()
567 async def _ws_handler(self, websocket, path=None):
568 """Handle a single WebSocket connection (one per user).
570 Protocol:
571 1. Client sends user_id (digit string)
572 2. Client sends "video_start" (camera, default) or "screen_start"
573 3. Client sends binary JPEG frames
574 4. Client sends "video_stop" to end
575 """
576 import cv2
578 user_id = None
579 channel = 'camera' # default channel
580 try:
581 async for message in websocket:
582 if isinstance(message, bytes):
583 frame = cv2.imdecode(
584 np.frombuffer(message, np.uint8), cv2.IMREAD_COLOR,
585 )
586 if frame is not None and user_id:
587 frame = cv2.fastNlMeansDenoisingColored(
588 frame, None, 10, 10, 7, 21
589 )
590 _, encoded = cv2.imencode('.jpg', frame)
591 jpeg_bytes = encoded.tobytes()
592 if channel == 'screen':
593 self.store.put_screen_frame(user_id, jpeg_bytes)
594 else:
595 self.store.put_frame(user_id, jpeg_bytes)
596 elif isinstance(message, str):
597 if message.isdigit():
598 user_id = message
599 logger.info(f"Frame session started for user {user_id}")
600 elif message == 'screen_start':
601 channel = 'screen'
602 logger.info(f"User {user_id} switched to screen channel")
603 elif message == 'video_start':
604 channel = 'camera'
605 elif message == 'video_stop':
606 break
607 except Exception as e:
608 logger.debug(f"WebSocket session ended: {e}")
609 finally:
610 if user_id:
611 logger.info(f"Frame session ended for user {user_id} ({channel})")
613 # ─── Description Loop ───
615 def _description_loop(self):
616 """Periodically describe frames via MiniCPM with intelligent sampling.
618 Only calls MiniCPM when frame difference exceeds threshold.
619 Backs off interval for static scenes (4s → 8s → ... → 30s cap).
620 Evaluates visual triggers after each new description.
621 Processes both camera and screen channels.
622 """
623 while self._running:
624 if self._circuit_open:
625 time.sleep(self._description_interval * 2)
626 self._check_minicpm_health()
627 continue
629 try:
630 users = self.store.active_users()
631 for user_id in users:
632 if not self._running:
633 return
635 # Camera channel
636 frame_bytes = self.store.get_frame(user_id)
637 if frame_bytes:
638 # Piggyback face signature enrollment (zero extra I/O)
639 self._enroll_face_signature(user_id, frame_bytes)
640 if self._should_describe(user_id, frame_bytes, 'camera'):
641 desc = self._describe_frame(user_id, frame_bytes)
642 if desc:
643 self.store.put_description(user_id, desc)
644 self._post_description_to_db(user_id, desc)
645 self._record_to_world_model(user_id, desc, 'camera', frame_bytes)
646 self._save_to_memory_graph(user_id, desc, 'camera')
647 self._emit_perception_event(user_id, desc, 'camera')
648 self._evaluate_visual_triggers(
649 user_id, desc, 'camera'
650 )
651 self._frames_described += 1
652 else:
653 self._frames_skipped += 1
655 # Screen channel
656 screen_bytes = self.store.get_screen_frame(user_id)
657 if screen_bytes:
658 if self._should_describe(user_id, screen_bytes, 'screen'):
659 desc = self._describe_frame(
660 user_id, screen_bytes,
661 prompt='describe what is on the computer screen in 20 words',
662 )
663 if desc:
664 self.store.put_screen_description(user_id, desc)
665 self._post_description_to_db(
666 user_id, desc,
667 label='Screen Context',
668 zeroshot_label='Screen Reasoning',
669 )
670 self._record_to_world_model(user_id, desc, 'screen', screen_bytes)
671 self._save_to_memory_graph(user_id, desc, 'screen')
672 self._emit_perception_event(user_id, desc, 'screen')
673 self._evaluate_visual_triggers(
674 user_id, desc, 'screen'
675 )
676 self._frames_described += 1
677 else:
678 self._frames_skipped += 1
679 except Exception as e:
680 logger.debug(f"Description loop error: {e}")
682 # Check if vision backend should unload (no frames for IDLE_TIMEOUT_S)
683 if self._vision_backend and hasattr(self._vision_backend, 'check_idle'):
684 self._vision_backend.check_idle()
686 # Sleep the minimum user interval (so we check the fastest user on time)
687 min_interval = self._description_interval
688 if self._user_intervals:
689 min_interval = min(
690 min_interval,
691 min(self._user_intervals.values())
692 )
693 time.sleep(min_interval)
695 def _describe_frame(
696 self, user_id: str, frame_bytes: bytes,
697 prompt: str = 'describe what the user is doing in 20 words',
698 ) -> Optional[str]:
699 """Describe a frame using the active backend (MiniCPM or lightweight)."""
700 if self._circuit_open:
701 return None
703 # Use lightweight backend if available
704 if self._vision_backend is not None:
705 try:
706 return self._vision_backend.describe(frame_bytes)
707 except Exception as e:
708 logger.debug(f"Lightweight backend error: {e}")
709 return None
711 # MiniCPM sidecar path
712 try:
713 r = pooled_post(
714 f'http://localhost:{self._minicpm_port}/describe',
715 data=frame_bytes,
716 params={'prompt': prompt},
717 headers={'Content-Type': 'application/octet-stream'},
718 timeout=10,
719 )
720 if r.status_code == 200:
721 return r.json().get('result')
722 except requests.RequestException:
723 self._consecutive_failures += 1
724 if self._consecutive_failures >= self._max_failures:
725 self._circuit_open = True
726 return None
728 def _post_description_to_db(
729 self, user_id: str, description: str,
730 label: str = 'Visual Context',
731 zeroshot_label: str = 'Video Reasoning',
732 ):
733 """POST description to main server DB (/create_action pattern)."""
734 if not self._callback_url:
735 return
736 try:
737 pooled_post(
738 f'{self._callback_url}/create_action',
739 json={
740 'user_id': user_id,
741 'conv_id': '0',
742 'action': description[:100],
743 'zeroshot_label': zeroshot_label,
744 'gpt3_label': label,
745 },
746 timeout=5,
747 )
748 except requests.RequestException:
749 pass
751 # ─── World Model Integration ───
753 def _record_to_world_model(self, user_id: str, description: str,
754 channel: str = 'camera',
755 frame_bytes: bytes = None):
756 """Feed scene descriptions AND raw frames to world model for learning.
758 Two data paths to HevolveAI:
759 1. Text description -> record_interaction() (language learning)
760 2. Raw frame bytes -> submit_sensor_frame() (visual prediction learning)
761 Both are needed: text for semantic understanding, frames for
762 autoregressive visual prediction error (predict next frame -> compare).
763 """
764 try:
765 from integrations.agent_engine.world_model_bridge import get_world_model_bridge
766 bridge = get_world_model_bridge()
767 model_id = self._vision_backend.name if self._vision_backend else 'minicpm-v2'
769 # 1. Text description (existing path)
770 bridge.record_interaction(
771 user_id=user_id, prompt_id=f'vision_{channel}',
772 prompt=f'[{channel}] describe what you see',
773 response=description, model_id=model_id)
775 # 2. Raw frame for visual encoding + autoregressive learning
776 # Only when frame bytes available and scene changed (caller already filtered)
777 if frame_bytes is not None:
778 # Submit async to avoid blocking description loop
779 self._flush_executor_submit(
780 bridge.submit_sensor_frame,
781 user_id, frame_bytes, channel,
782 1.0 if channel == 'camera' else 0.0,
783 )
784 except Exception:
785 pass
787 def _flush_executor_submit(self, fn, *args):
788 """Submit work to a background thread (reuse VisionService's thread pool)."""
789 try:
790 import concurrent.futures
791 if not hasattr(self, '_frame_forward_executor'):
792 self._frame_forward_executor = concurrent.futures.ThreadPoolExecutor(
793 max_workers=1, thread_name_prefix='frame_fwd')
794 self._frame_forward_executor.submit(fn, *args)
795 except Exception:
796 pass
798 # ─── Temporal Perception ───
800 def _save_to_memory_graph(self, user_id: str, description: str, channel: str):
801 """Auto-save visual description to MemoryGraph for long-term recall.
803 Singleton accessor for hart_intelligence — never eager-imports
804 from a worker thread (vision pipeline runs in background loops
805 that would deadlock on the canonical loader's import lock).
806 """
807 try:
808 from core.safe_hartos_attr import safe_hartos_attr
809 _get_or_create_graph = safe_hartos_attr('_get_or_create_graph')
810 if _get_or_create_graph is None:
811 logger.debug(
812 "Memory graph save skipped: HARTOS "
813 "_get_or_create_graph unresolvable — user=%s channel=%s",
814 user_id, channel,
815 )
816 return
817 graph = _get_or_create_graph(user_id)
818 if graph:
819 graph.add(
820 content=description,
821 metadata={'channel': channel, 'type': 'visual_context'},
822 tags=['visual', channel],
823 )
824 logger.debug(
825 "Memory graph save: user=%s channel=%s desc_len=%d",
826 user_id, channel, len(description or ''),
827 )
828 except Exception as e:
829 logger.debug(
830 "Memory graph save failed: user=%s channel=%s err=%s",
831 user_id, channel, e,
832 )
834 def _emit_perception_event(self, user_id: str, description: str, channel: str):
835 """Emit present-tense perception event on EventBus."""
836 try:
837 from core.platform.events import emit_event
838 emit_event('perception.vision.present', {
839 'user_id': user_id, 'channel': channel,
840 'content': description, 'timestamp': time.time(),
841 })
842 except Exception:
843 pass
845 # ─── Config ───
847 def _load_config(self, config_path: Optional[str]) -> Dict:
848 """Load embodied AI config if available."""
849 if config_path and os.path.isfile(config_path):
850 try:
851 with open(config_path) as f:
852 return json.load(f)
853 except Exception:
854 pass
856 default = os.path.join(
857 os.path.expanduser('~'), '.hevolve', 'embodied_ai_config.json'
858 )
859 if os.path.isfile(default):
860 try:
861 with open(default) as f:
862 return json.load(f)
863 except Exception:
864 pass
866 return {}