Coverage for integrations / remote_desktop / window_session.py: 45.7%

127 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Window Session Manager — Multi-window streaming sessions for tab-detach. 

3 

4Each remote application window can be streamed as an independent session. 

5The viewer sees separate "tabs" for each window, and can detach them 

6into standalone viewers. 

7 

8Reuses: 

9 - window_capture.py: WindowEnumerator, WindowCapture, WindowInfo 

10 - session_manager.py: SessionManager for auth/lifecycle 

11 - transport.py: TransportChannel for frame delivery 

12""" 

13 

14import logging 

15import threading 

16import time 

17import uuid 

18from dataclasses import dataclass, field 

19from typing import Callable, Dict, List, Optional 

20 

21logger = logging.getLogger('hevolve.remote_desktop') 

22 

23 

24@dataclass 

25class WindowSession: 

26 """One streaming session bound to one OS window.""" 

27 session_id: str 

28 window_hwnd: int 

29 window_title: str 

30 process_name: str 

31 started_at: float 

32 viewer_count: int = 0 

33 capture_thread: Optional[threading.Thread] = field(default=None, repr=False) 

34 running: bool = False 

35 

36 def to_dict(self) -> dict: 

37 return { 

38 'session_id': self.session_id, 

39 'window_hwnd': self.window_hwnd, 

40 'window_title': self.window_title, 

41 'process_name': self.process_name, 

42 'started_at': self.started_at, 

43 'viewer_count': self.viewer_count, 

44 'running': self.running, 

45 } 

46 

47 

48class WindowSessionManager: 

49 """Manages multiple concurrent window capture sessions. 

50 

51 Integrates with existing SessionManager for auth/lifecycle but adds 

52 window-level granularity. Each window session has its own capture thread 

53 and transport channel. 

54 """ 

55 

56 def __init__(self): 

57 self._sessions: Dict[str, WindowSession] = {} 

58 self._captures: Dict[str, object] = {} # session_id → WindowCapture 

59 self._transports: Dict[str, object] = {} # session_id → TransportChannel 

60 self._lock = threading.Lock() 

61 self._frame_callbacks: Dict[str, Callable] = {} 

62 

63 def list_available_windows(self) -> List[dict]: 

64 """Enumerate host windows available for streaming. 

65 

66 Returns list of WindowInfo dicts (excludes system/shell windows). 

67 """ 

68 try: 

69 from integrations.remote_desktop.window_capture import WindowEnumerator 

70 enum = WindowEnumerator() 

71 windows = enum.list_windows(include_minimized=False) 

72 # Filter out trivial system windows 

73 filtered = [] 

74 for w in windows: 

75 # Skip windows with very small dimensions 

76 _, _, width, height = w.rect 

77 if width < 50 or height < 50: 

78 continue 

79 filtered.append(w.to_dict()) 

80 return filtered 

81 except Exception as e: 

82 logger.warning(f"Window enumeration failed: {e}") 

83 return [] 

84 

85 def start_window_session(self, window_hwnd: int, 

86 window_title: str = '', 

87 process_name: str = '', 

88 transport=None, 

89 user_id: Optional[str] = None) -> dict: 

90 """Start streaming a specific window. 

91 

92 Args: 

93 window_hwnd: OS window handle to capture. 

94 window_title: Window title (for display). 

95 process_name: Process name (for display). 

96 transport: TransportChannel to send frames on. 

97 user_id: User starting the session. 

98 

99 Returns: 

100 {session_id, status, window_title, ...} 

101 """ 

102 session_id = f"win-{uuid.uuid4().hex[:12]}" 

103 

104 # Create WindowCapture 

105 try: 

106 from integrations.remote_desktop.window_capture import ( 

107 WindowInfo, WindowCapture, WindowCaptureConfig, 

108 ) 

109 winfo = WindowInfo( 

110 hwnd=window_hwnd, 

111 title=window_title, 

112 process_name=process_name, 

113 pid=0, 

114 rect=(0, 0, 0, 0), 

115 ) 

116 # Try to get real rect via refresh 

117 from integrations.remote_desktop.window_capture import WindowEnumerator 

118 enum = WindowEnumerator() 

119 refreshed = enum.refresh_window_info(winfo) 

120 if refreshed: 

121 winfo = refreshed 

122 

123 capture = WindowCapture(winfo, WindowCaptureConfig()) 

124 except Exception as e: 

125 return {'status': 'error', 'error': f'Window capture init failed: {e}'} 

126 

127 ws = WindowSession( 

128 session_id=session_id, 

129 window_hwnd=window_hwnd, 

130 window_title=winfo.title or window_title, 

131 process_name=winfo.process_name or process_name, 

132 started_at=time.time(), 

133 running=True, 

134 ) 

135 

136 with self._lock: 

137 self._sessions[session_id] = ws 

138 self._captures[session_id] = capture 

139 

140 if transport: 

141 self._transports[session_id] = transport 

142 

143 # Start capture thread 

144 thread = threading.Thread( 

145 target=self._capture_loop, 

146 args=(session_id,), 

147 daemon=True, 

148 name=f'wincap-{session_id[:8]}', 

149 ) 

150 ws.capture_thread = thread 

151 thread.start() 

152 

153 logger.info(f"Window session started: {ws.window_title} " 

154 f"({session_id[:8]})") 

155 

156 return { 

157 'status': 'streaming', 

158 'session_id': session_id, 

159 'window_title': ws.window_title, 

160 'process_name': ws.process_name, 

161 'window_hwnd': window_hwnd, 

162 } 

163 

164 def stop_window_session(self, session_id: str) -> bool: 

165 """Stop a window streaming session.""" 

166 with self._lock: 

167 ws = self._sessions.get(session_id) 

168 if not ws: 

169 return False 

170 ws.running = False 

171 

172 # Wait for capture thread to finish 

173 if ws.capture_thread: 

174 ws.capture_thread.join(timeout=5) 

175 

176 # Cleanup 

177 capture = self._captures.pop(session_id, None) 

178 if capture: 

179 capture.stop() 

180 self._transports.pop(session_id, None) 

181 self._frame_callbacks.pop(session_id, None) 

182 

183 with self._lock: 

184 self._sessions.pop(session_id, None) 

185 

186 logger.info(f"Window session stopped: {session_id[:8]}") 

187 return True 

188 

189 def get_active_window_sessions(self) -> List[dict]: 

190 """Get all active window sessions as dicts.""" 

191 with self._lock: 

192 return [ws.to_dict() for ws in self._sessions.values() 

193 if ws.running] 

194 

195 def get_session(self, session_id: str) -> Optional[dict]: 

196 """Get a specific session.""" 

197 with self._lock: 

198 ws = self._sessions.get(session_id) 

199 return ws.to_dict() if ws else None 

200 

201 def on_frame(self, session_id: str, 

202 callback: Callable[[str, bytes], None]) -> None: 

203 """Register a callback for frames from a window session. 

204 

205 callback(session_id, jpeg_bytes) 

206 """ 

207 self._frame_callbacks[session_id] = callback 

208 

209 def detach_window(self, session_id: str) -> dict: 

210 """'Tab detach' — return connection info for a standalone viewer. 

211 

212 The window session continues running; this just provides the info 

213 needed for a separate viewer instance to connect to it. 

214 """ 

215 with self._lock: 

216 ws = self._sessions.get(session_id) 

217 if not ws: 

218 return {'status': 'error', 'error': 'Session not found'} 

219 if not ws.running: 

220 return {'status': 'error', 'error': 'Session not running'} 

221 

222 return { 

223 'status': 'detached', 

224 'session_id': session_id, 

225 'window_title': ws.window_title, 

226 'process_name': ws.process_name, 

227 'window_hwnd': ws.window_hwnd, 

228 } 

229 

230 def stop_all(self) -> int: 

231 """Stop all window sessions. Returns count stopped.""" 

232 with self._lock: 

233 session_ids = list(self._sessions.keys()) 

234 count = 0 

235 for sid in session_ids: 

236 if self.stop_window_session(sid): 

237 count += 1 

238 return count 

239 

240 def _capture_loop(self, session_id: str) -> None: 

241 """Capture loop for a window session — sends frames via transport/callback.""" 

242 capture = self._captures.get(session_id) 

243 if not capture: 

244 return 

245 

246 ws = self._sessions.get(session_id) 

247 if not ws: 

248 return 

249 

250 logger.debug(f"Window capture loop started: {session_id[:8]}") 

251 for frame in capture.capture_loop(): 

252 if not ws.running: 

253 break 

254 

255 # Send via transport if available 

256 transport = self._transports.get(session_id) 

257 if transport and hasattr(transport, 'send_frame'): 

258 try: 

259 transport.send_frame(frame) 

260 except Exception: 

261 pass 

262 

263 # Notify callback if registered 

264 callback = self._frame_callbacks.get(session_id) 

265 if callback: 

266 try: 

267 callback(session_id, frame) 

268 except Exception: 

269 pass 

270 

271 logger.debug(f"Window capture loop ended: {session_id[:8]}") 

272 

273 

274# ── Singleton ──────────────────────────────────────────────── 

275 

276_window_session_manager: Optional[WindowSessionManager] = None 

277 

278 

279def get_window_session_manager() -> WindowSessionManager: 

280 """Get or create the singleton WindowSessionManager.""" 

281 global _window_session_manager 

282 if _window_session_manager is None: 

283 _window_session_manager = WindowSessionManager() 

284 return _window_session_manager