Coverage for integrations / vision / frame_store.py: 97.6%

124 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Thread-safe in-process frame store — replaces Redis for desktop deployment. 

3 

4Stores raw frames (numpy arrays) and text descriptions per user_id, 

5with separate channels for camera and screen feeds. 

6 

7Camera descriptions are also persisted to DB (longer-lived context). 

8Screen descriptions are short-lived TTL only (they go stale fast). 

9 

10Re-exports compute_frame_difference and decode_jpeg from 

11HevolveAI's visual_encoding utilities (canonical source). 

12""" 

13import threading 

14import time 

15from collections import deque 

16from typing import Optional, Dict, Any, List, Tuple 

17 

18# Canonical frame utilities live in HevolveAI (downstream dep). 

19# Re-export here so VisionService imports stay clean. 

20# 

21# We go through security.native_hive_loader.try_import_hevolveai_names so 

22# that an armored bundle (HevolveArmor) is transparently unlocked before 

23# the import — the scattered bare `import hevolveai.X` probes used to miss 

24# the armored path when a key was needed. 

25import logging as _logging 

26_logger = _logging.getLogger(__name__) 

27 

28_visual_encoding_result = None 

29try: 

30 from security.native_hive_loader import try_import_hevolveai_names 

31 _visual_encoding_result = try_import_hevolveai_names( 

32 'hevolveai.embodied_ai.utils.visual_encoding', 

33 ('compute_frame_difference', 'decode_jpeg'), 

34 ) 

35except ImportError: 

36 # security package itself unavailable — treat as missing HevolveAI 

37 _visual_encoding_result = None 

38 

39if _visual_encoding_result is not None: 

40 compute_frame_difference, decode_jpeg = _visual_encoding_result 

41else: 

42 # Fallback if HevolveAI not installed (e.g. tests without full deps, 

43 # or armored bundle present but key unavailable) 

44 _logger.warning( 

45 "HevolveAI not installed/armored — visual_encoding using numpy fallback") 

46 import numpy as np 

47 

48 def compute_frame_difference(frame1: 'np.ndarray', frame2: 'np.ndarray') -> float: 

49 diff = np.abs(frame1.astype(np.float32) - frame2.astype(np.float32)) 

50 return float(diff.mean() / 255.0) 

51 

52 def decode_jpeg(frame_bytes: bytes) -> Optional['np.ndarray']: 

53 try: 

54 import cv2 

55 return cv2.imdecode(np.frombuffer(frame_bytes, np.uint8), cv2.IMREAD_COLOR) 

56 except Exception: 

57 return None 

58 

59 

60class FrameStore: 

61 """Thread-safe frame + description store, replaces Redis for desktop apps. 

62 

63 Supports two visual channels per user: 

64 - **camera**: Physical camera frames (MiniCPM-described, also saved to DB) 

65 - **screen**: Screen capture frames (shorter TTL, stale faster) 

66 

67 Each user_id gets bounded frame buffers and latest descriptions per channel. 

68 All access is guarded by a single RLock. 

69 """ 

70 

71 def __init__( 

72 self, 

73 max_frames: int = 5, 

74 description_ttl: float = 30.0, 

75 screen_description_ttl: float = 15.0, 

76 ): 

77 self._lock = threading.RLock() 

78 # Camera channel (default, backward compatible) 

79 self._frames: Dict[str, deque] = {} # user_id → deque of (timestamp, frame_bytes) 

80 self._descriptions: Dict[str, tuple] = {} # user_id → (timestamp, text) 

81 # Screen channel 

82 self._screen_frames: Dict[str, deque] = {} 

83 self._screen_descriptions: Dict[str, tuple] = {} 

84 # Description history (bounded ring buffer for last N descriptions per channel) 

85 self._camera_desc_history: Dict[str, deque] = {} # user_id → deque of (ts, text) 

86 self._screen_desc_history: Dict[str, deque] = {} 

87 self._max_desc_history = 20 # Keep last 20 descriptions per channel 

88 

89 self._max_frames = max_frames 

90 self._description_ttl = description_ttl 

91 self._screen_description_ttl = screen_description_ttl 

92 

93 # ─── Camera Channel (default, backward compatible) ─── 

94 

95 def put_frame(self, user_id: str, frame_bytes: bytes): 

96 """Store a raw camera frame for a user (FIFO bounded).""" 

97 with self._lock: 

98 if user_id not in self._frames: 

99 self._frames[user_id] = deque(maxlen=self._max_frames) 

100 self._frames[user_id].append((time.time(), frame_bytes)) 

101 

102 def get_frame(self, user_id: str) -> Optional[bytes]: 

103 """Get the latest camera frame for a user, or None.""" 

104 with self._lock: 

105 buf = self._frames.get(user_id) 

106 if buf: 

107 return buf[-1][1] 

108 return None 

109 

110 def get_frame_count(self, user_id: str) -> int: 

111 """Number of buffered camera frames for a user.""" 

112 with self._lock: 

113 buf = self._frames.get(user_id) 

114 return len(buf) if buf else 0 

115 

116 def put_description(self, user_id: str, text: str): 

117 """Store a camera scene description for a user.""" 

118 with self._lock: 

119 now = time.time() 

120 self._descriptions[user_id] = (now, text) 

121 # Also append to history ring buffer 

122 if user_id not in self._camera_desc_history: 

123 self._camera_desc_history[user_id] = deque( 

124 maxlen=self._max_desc_history 

125 ) 

126 self._camera_desc_history[user_id].append((now, text)) 

127 

128 def get_description(self, user_id: str) -> Optional[str]: 

129 """Get the latest camera description if within TTL, else None.""" 

130 with self._lock: 

131 entry = self._descriptions.get(user_id) 

132 if entry is None: 

133 return None 

134 ts, text = entry 

135 if time.time() - ts > self._description_ttl: 

136 return None 

137 return text 

138 

139 # ─── Screen Channel ─── 

140 

141 def put_screen_frame(self, user_id: str, frame_bytes: bytes): 

142 """Store a raw screen capture frame for a user (FIFO bounded).""" 

143 with self._lock: 

144 if user_id not in self._screen_frames: 

145 self._screen_frames[user_id] = deque(maxlen=self._max_frames) 

146 self._screen_frames[user_id].append((time.time(), frame_bytes)) 

147 

148 def get_screen_frame(self, user_id: str) -> Optional[bytes]: 

149 """Get the latest screen frame for a user, or None.""" 

150 with self._lock: 

151 buf = self._screen_frames.get(user_id) 

152 if buf: 

153 return buf[-1][1] 

154 return None 

155 

156 def put_screen_description(self, user_id: str, text: str): 

157 """Store a screen description for a user (shorter TTL than camera).""" 

158 with self._lock: 

159 now = time.time() 

160 self._screen_descriptions[user_id] = (now, text) 

161 # Also append to history ring buffer 

162 if user_id not in self._screen_desc_history: 

163 self._screen_desc_history[user_id] = deque( 

164 maxlen=self._max_desc_history 

165 ) 

166 self._screen_desc_history[user_id].append((now, text)) 

167 

168 def get_screen_description(self, user_id: str) -> Optional[str]: 

169 """Get the latest screen description if within TTL, else None.""" 

170 with self._lock: 

171 entry = self._screen_descriptions.get(user_id) 

172 if entry is None: 

173 return None 

174 ts, text = entry 

175 if time.time() - ts > self._screen_description_ttl: 

176 return None 

177 return text 

178 

179 def get_screen_description_history( 

180 self, user_id: str, max_age_seconds: float = 60.0 

181 ) -> List[Tuple[float, str]]: 

182 """Get recent screen descriptions within max_age_seconds. 

183 

184 Returns list of (timestamp, text) tuples, newest first. 

185 """ 

186 with self._lock: 

187 history = self._screen_desc_history.get(user_id) 

188 if not history: 

189 return [] 

190 cutoff = time.time() - max_age_seconds 

191 result = [(ts, text) for ts, text in history if ts >= cutoff] 

192 result.reverse() # newest first 

193 return result 

194 

195 def get_camera_description_history( 

196 self, user_id: str, max_age_seconds: float = 300.0 

197 ) -> List[Tuple[float, str]]: 

198 """Get recent camera descriptions within max_age_seconds. 

199 

200 Returns list of (timestamp, text) tuples, newest first. 

201 """ 

202 with self._lock: 

203 history = self._camera_desc_history.get(user_id) 

204 if not history: 

205 return [] 

206 cutoff = time.time() - max_age_seconds 

207 result = [(ts, text) for ts, text in history if ts >= cutoff] 

208 result.reverse() 

209 return result 

210 

211 # ─── Shared ─── 

212 

213 def clear_user(self, user_id: str): 

214 """Remove all data for a user (both channels).""" 

215 with self._lock: 

216 self._frames.pop(user_id, None) 

217 self._descriptions.pop(user_id, None) 

218 self._screen_frames.pop(user_id, None) 

219 self._screen_descriptions.pop(user_id, None) 

220 self._camera_desc_history.pop(user_id, None) 

221 self._screen_desc_history.pop(user_id, None) 

222 

223 def active_users(self) -> list: 

224 """Return list of user_ids with stored frames (either channel).""" 

225 with self._lock: 

226 return list( 

227 set(self._frames.keys()) | set(self._screen_frames.keys()) 

228 ) 

229 

230 def stats(self) -> Dict[str, Any]: 

231 """Return store statistics.""" 

232 with self._lock: 

233 return { 

234 'active_users': len( 

235 set(self._frames.keys()) | set(self._screen_frames.keys()) 

236 ), 

237 'camera_frames': sum(len(d) for d in self._frames.values()), 

238 'screen_frames': sum( 

239 len(d) for d in self._screen_frames.values() 

240 ), 

241 'total_frames': ( 

242 sum(len(d) for d in self._frames.values()) 

243 + sum(len(d) for d in self._screen_frames.values()) 

244 ), 

245 'camera_descriptions': len(self._descriptions), 

246 'screen_descriptions': len(self._screen_descriptions), 

247 }