Coverage for integrations / vision / frame_store.py: 97.6%
124 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Thread-safe in-process frame store — replaces Redis for desktop deployment.
4Stores raw frames (numpy arrays) and text descriptions per user_id,
5with separate channels for camera and screen feeds.
7Camera descriptions are also persisted to DB (longer-lived context).
8Screen descriptions are short-lived TTL only (they go stale fast).
10Re-exports compute_frame_difference and decode_jpeg from
11HevolveAI's visual_encoding utilities (canonical source).
12"""
13import threading
14import time
15from collections import deque
16from typing import Optional, Dict, Any, List, Tuple
18# Canonical frame utilities live in HevolveAI (downstream dep).
19# Re-export here so VisionService imports stay clean.
20#
21# We go through security.native_hive_loader.try_import_hevolveai_names so
22# that an armored bundle (HevolveArmor) is transparently unlocked before
23# the import — the scattered bare `import hevolveai.X` probes used to miss
24# the armored path when a key was needed.
25import logging as _logging
26_logger = _logging.getLogger(__name__)
28_visual_encoding_result = None
29try:
30 from security.native_hive_loader import try_import_hevolveai_names
31 _visual_encoding_result = try_import_hevolveai_names(
32 'hevolveai.embodied_ai.utils.visual_encoding',
33 ('compute_frame_difference', 'decode_jpeg'),
34 )
35except ImportError:
36 # security package itself unavailable — treat as missing HevolveAI
37 _visual_encoding_result = None
39if _visual_encoding_result is not None:
40 compute_frame_difference, decode_jpeg = _visual_encoding_result
41else:
42 # Fallback if HevolveAI not installed (e.g. tests without full deps,
43 # or armored bundle present but key unavailable)
44 _logger.warning(
45 "HevolveAI not installed/armored — visual_encoding using numpy fallback")
46 import numpy as np
48 def compute_frame_difference(frame1: 'np.ndarray', frame2: 'np.ndarray') -> float:
49 diff = np.abs(frame1.astype(np.float32) - frame2.astype(np.float32))
50 return float(diff.mean() / 255.0)
52 def decode_jpeg(frame_bytes: bytes) -> Optional['np.ndarray']:
53 try:
54 import cv2
55 return cv2.imdecode(np.frombuffer(frame_bytes, np.uint8), cv2.IMREAD_COLOR)
56 except Exception:
57 return None
60class FrameStore:
61 """Thread-safe frame + description store, replaces Redis for desktop apps.
63 Supports two visual channels per user:
64 - **camera**: Physical camera frames (MiniCPM-described, also saved to DB)
65 - **screen**: Screen capture frames (shorter TTL, stale faster)
67 Each user_id gets bounded frame buffers and latest descriptions per channel.
68 All access is guarded by a single RLock.
69 """
71 def __init__(
72 self,
73 max_frames: int = 5,
74 description_ttl: float = 30.0,
75 screen_description_ttl: float = 15.0,
76 ):
77 self._lock = threading.RLock()
78 # Camera channel (default, backward compatible)
79 self._frames: Dict[str, deque] = {} # user_id → deque of (timestamp, frame_bytes)
80 self._descriptions: Dict[str, tuple] = {} # user_id → (timestamp, text)
81 # Screen channel
82 self._screen_frames: Dict[str, deque] = {}
83 self._screen_descriptions: Dict[str, tuple] = {}
84 # Description history (bounded ring buffer for last N descriptions per channel)
85 self._camera_desc_history: Dict[str, deque] = {} # user_id → deque of (ts, text)
86 self._screen_desc_history: Dict[str, deque] = {}
87 self._max_desc_history = 20 # Keep last 20 descriptions per channel
89 self._max_frames = max_frames
90 self._description_ttl = description_ttl
91 self._screen_description_ttl = screen_description_ttl
93 # ─── Camera Channel (default, backward compatible) ───
95 def put_frame(self, user_id: str, frame_bytes: bytes):
96 """Store a raw camera frame for a user (FIFO bounded)."""
97 with self._lock:
98 if user_id not in self._frames:
99 self._frames[user_id] = deque(maxlen=self._max_frames)
100 self._frames[user_id].append((time.time(), frame_bytes))
102 def get_frame(self, user_id: str) -> Optional[bytes]:
103 """Get the latest camera frame for a user, or None."""
104 with self._lock:
105 buf = self._frames.get(user_id)
106 if buf:
107 return buf[-1][1]
108 return None
110 def get_frame_count(self, user_id: str) -> int:
111 """Number of buffered camera frames for a user."""
112 with self._lock:
113 buf = self._frames.get(user_id)
114 return len(buf) if buf else 0
116 def put_description(self, user_id: str, text: str):
117 """Store a camera scene description for a user."""
118 with self._lock:
119 now = time.time()
120 self._descriptions[user_id] = (now, text)
121 # Also append to history ring buffer
122 if user_id not in self._camera_desc_history:
123 self._camera_desc_history[user_id] = deque(
124 maxlen=self._max_desc_history
125 )
126 self._camera_desc_history[user_id].append((now, text))
128 def get_description(self, user_id: str) -> Optional[str]:
129 """Get the latest camera description if within TTL, else None."""
130 with self._lock:
131 entry = self._descriptions.get(user_id)
132 if entry is None:
133 return None
134 ts, text = entry
135 if time.time() - ts > self._description_ttl:
136 return None
137 return text
139 # ─── Screen Channel ───
141 def put_screen_frame(self, user_id: str, frame_bytes: bytes):
142 """Store a raw screen capture frame for a user (FIFO bounded)."""
143 with self._lock:
144 if user_id not in self._screen_frames:
145 self._screen_frames[user_id] = deque(maxlen=self._max_frames)
146 self._screen_frames[user_id].append((time.time(), frame_bytes))
148 def get_screen_frame(self, user_id: str) -> Optional[bytes]:
149 """Get the latest screen frame for a user, or None."""
150 with self._lock:
151 buf = self._screen_frames.get(user_id)
152 if buf:
153 return buf[-1][1]
154 return None
156 def put_screen_description(self, user_id: str, text: str):
157 """Store a screen description for a user (shorter TTL than camera)."""
158 with self._lock:
159 now = time.time()
160 self._screen_descriptions[user_id] = (now, text)
161 # Also append to history ring buffer
162 if user_id not in self._screen_desc_history:
163 self._screen_desc_history[user_id] = deque(
164 maxlen=self._max_desc_history
165 )
166 self._screen_desc_history[user_id].append((now, text))
168 def get_screen_description(self, user_id: str) -> Optional[str]:
169 """Get the latest screen description if within TTL, else None."""
170 with self._lock:
171 entry = self._screen_descriptions.get(user_id)
172 if entry is None:
173 return None
174 ts, text = entry
175 if time.time() - ts > self._screen_description_ttl:
176 return None
177 return text
179 def get_screen_description_history(
180 self, user_id: str, max_age_seconds: float = 60.0
181 ) -> List[Tuple[float, str]]:
182 """Get recent screen descriptions within max_age_seconds.
184 Returns list of (timestamp, text) tuples, newest first.
185 """
186 with self._lock:
187 history = self._screen_desc_history.get(user_id)
188 if not history:
189 return []
190 cutoff = time.time() - max_age_seconds
191 result = [(ts, text) for ts, text in history if ts >= cutoff]
192 result.reverse() # newest first
193 return result
195 def get_camera_description_history(
196 self, user_id: str, max_age_seconds: float = 300.0
197 ) -> List[Tuple[float, str]]:
198 """Get recent camera descriptions within max_age_seconds.
200 Returns list of (timestamp, text) tuples, newest first.
201 """
202 with self._lock:
203 history = self._camera_desc_history.get(user_id)
204 if not history:
205 return []
206 cutoff = time.time() - max_age_seconds
207 result = [(ts, text) for ts, text in history if ts >= cutoff]
208 result.reverse()
209 return result
211 # ─── Shared ───
213 def clear_user(self, user_id: str):
214 """Remove all data for a user (both channels)."""
215 with self._lock:
216 self._frames.pop(user_id, None)
217 self._descriptions.pop(user_id, None)
218 self._screen_frames.pop(user_id, None)
219 self._screen_descriptions.pop(user_id, None)
220 self._camera_desc_history.pop(user_id, None)
221 self._screen_desc_history.pop(user_id, None)
223 def active_users(self) -> list:
224 """Return list of user_ids with stored frames (either channel)."""
225 with self._lock:
226 return list(
227 set(self._frames.keys()) | set(self._screen_frames.keys())
228 )
230 def stats(self) -> Dict[str, Any]:
231 """Return store statistics."""
232 with self._lock:
233 return {
234 'active_users': len(
235 set(self._frames.keys()) | set(self._screen_frames.keys())
236 ),
237 'camera_frames': sum(len(d) for d in self._frames.values()),
238 'screen_frames': sum(
239 len(d) for d in self._screen_frames.values()
240 ),
241 'total_frames': (
242 sum(len(d) for d in self._frames.values())
243 + sum(len(d) for d in self._screen_frames.values())
244 ),
245 'camera_descriptions': len(self._descriptions),
246 'screen_descriptions': len(self._screen_descriptions),
247 }