Coverage for integrations / remote_desktop / security.py: 72.6%

73 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Remote Desktop Security — Auth, audit, DLP, input classification, E2E encryption. 

3 

4Reuses existing HARTOS security infrastructure: 

5 - security/channel_encryption.py → encrypt_for_peer() for E2E frame encryption 

6 - security/immutable_audit_log.py → log_event() for session audit trail 

7 - security/action_classifier.py → classify_action() for destructive input gating 

8 - security/dlp_engine.py → check_outbound() for file transfer DLP 

9 - security/rate_limiter_redis.py → rate_limit() for connection attempt limiting 

10 - integrations/social/auth.py → generate_token_pair() for JWT session tokens 

11 - integrations/social/services.py → NotificationService.create() for connection requests 

12""" 

13 

14import logging 

15import time 

16from typing import Any, Dict, Optional, Tuple 

17 

18logger = logging.getLogger('hevolve.remote_desktop') 

19 

20 

21# ── Authentication ────────────────────────────────────────────── 

22 

23def authenticate_connection(host_device_id: str, viewer_device_id: str, 

24 password: str, 

25 host_user_id: Optional[str] = None, 

26 viewer_user_id: Optional[str] = None 

27 ) -> Tuple[bool, str]: 

28 """Authenticate a remote desktop connection. 

29 

30 Same-user: auto-accept (no OTP needed) — compute_mesh_service.py:398 pattern. 

31 Cross-user: OTP + explicit consent. 

32 

33 Returns: 

34 (success, reason) 

35 """ 

36 from integrations.remote_desktop.session_manager import get_session_manager 

37 

38 sm = get_session_manager() 

39 

40 # Same-user auto-accept 

41 if sm.is_same_user(host_user_id, viewer_user_id): 

42 logger.info( 

43 f"Same-user auto-accept: host={host_device_id[:8]}, " 

44 f"viewer={viewer_device_id[:8]}" 

45 ) 

46 return True, 'same_user_auto_accept' 

47 

48 # Cross-user: verify OTP 

49 if not password: 

50 return False, 'password_required' 

51 

52 if sm.verify_otp(host_device_id, password): 

53 return True, 'otp_verified' 

54 

55 return False, 'invalid_password' 

56 

57 

58def generate_session_token(session_id: str, device_id: str, 

59 user_id: Optional[str] = None) -> Optional[str]: 

60 """Generate JWT token for persistent remote desktop session. 

61 

62 Reuses integrations/social/auth.py:127 generate_token_pair(). 

63 Falls back to simple token if auth module unavailable. 

64 """ 

65 try: 

66 from integrations.social.auth import generate_token_pair 

67 token_pair = generate_token_pair( 

68 user_id=int(user_id) if user_id and user_id.isdigit() else 0, 

69 extra_claims={ 

70 'session_id': session_id, 

71 'device_id': device_id, 

72 'type': 'remote_desktop', 

73 }, 

74 ) 

75 return token_pair.get('access_token') 

76 except Exception as e: 

77 logger.debug(f"JWT auth unavailable, using simple token: {e}") 

78 import secrets 

79 return secrets.token_urlsafe(32) 

80 

81 

82# ── Notifications ─────────────────────────────────────────────── 

83 

84def notify_connection_request(source_user_id: str, target_user_id: str, 

85 source_device_id: str) -> bool: 

86 """Send notification to target user about incoming connection request. 

87 

88 Reuses NotificationService.create() from integrations/social/services.py. 

89 """ 

90 try: 

91 from integrations.social.services import NotificationService 

92 from integrations.social.models import db_session 

93 with db_session() as db: 

94 NotificationService.create( 

95 db=db, 

96 recipient_id=int(target_user_id), 

97 sender_id=int(source_user_id) if source_user_id.isdigit() else None, 

98 notification_type='remote_desktop_request', 

99 content=f"Remote desktop connection request from device {source_device_id[:8]}...", 

100 data={ 

101 'source_device_id': source_device_id, 

102 'source_user_id': source_user_id, 

103 'action': 'remote_desktop_connect', 

104 }, 

105 ) 

106 db.commit() 

107 logger.info( 

108 f"Connection request notification sent: " 

109 f"user {source_user_id} → user {target_user_id}" 

110 ) 

111 return True 

112 except Exception as e: 

113 logger.warning(f"Notification failed (service unavailable): {e}") 

114 return False 

115 

116 

117# ── Audit Logging ─────────────────────────────────────────────── 

118 

119def audit_session_event(event_type: str, session_id: str, 

120 actor_id: str, detail: Optional[Dict] = None 

121 ) -> Optional[Tuple[int, str]]: 

122 """Log remote desktop event to immutable audit log. 

123 

124 Reuses security/immutable_audit_log.py log_event(). 

125 """ 

126 try: 

127 from security.immutable_audit_log import get_audit_log 

128 audit = get_audit_log() 

129 return audit.log_event( 

130 event_type=f'remote_desktop.{event_type}', 

131 actor_id=actor_id, 

132 action=f'remote_desktop_{event_type}', 

133 detail={**(detail or {}), 'session_id': session_id}, 

134 target_id=session_id, 

135 ) 

136 except Exception as e: 

137 logger.debug(f"Audit log unavailable: {e}") 

138 return None 

139 

140 

141# ── Input Classification ──────────────────────────────────────── 

142 

143def classify_remote_input(action: dict) -> str: 

144 """Classify remote input action for safety gating. 

145 

146 Reuses security/action_classifier.py classify_action(). 

147 Destructive actions (Alt+F4, Ctrl+Alt+Del, etc.) require EXPERT tier. 

148 

149 Returns: 

150 'safe', 'destructive', or 'unknown' 

151 """ 

152 # Build text description of the action for the classifier 

153 action_type = action.get('type', '') 

154 key = action.get('key', '') 

155 text = action.get('text', '') 

156 hotkey = action.get('hotkey', '') 

157 

158 # Check for known destructive keyboard shortcuts 

159 DESTRUCTIVE_HOTKEYS = { 

160 'alt+f4', 'ctrl+alt+delete', 'ctrl+alt+del', 

161 'super+l', 'ctrl+shift+escape', 'alt+tab', 

162 } 

163 if hotkey and hotkey.lower() in DESTRUCTIVE_HOTKEYS: 

164 return 'destructive' 

165 

166 # Mouse events are inherently safe (click, move, scroll, drag) 

167 SAFE_INPUT_TYPES = {'click', 'rightclick', 'doubleclick', 'middleclick', 

168 'move', 'drag', 'scroll', 'mouse_move', 'mouse_down', 

169 'mouse_up', 'cursor'} 

170 if action_type in SAFE_INPUT_TYPES: 

171 return 'safe' 

172 

173 # Delegate to action_classifier for text/key-based classification 

174 try: 

175 from security.action_classifier import classify_action 

176 action_text = f"{action_type} {key} {text} {hotkey}".strip() 

177 if action_text: 

178 return classify_action(action_text) 

179 except Exception: 

180 pass 

181 

182 # If classifier unavailable, default to unknown for key events, safe for others 

183 return 'unknown' if hotkey or action_type == 'key' else 'safe' 

184 

185 

186# ── DLP (Data Loss Prevention) ────────────────────────────────── 

187 

188def scan_file_transfer(filename: str, 

189 content_preview: Optional[str] = None 

190 ) -> Tuple[bool, str]: 

191 """Scan file transfer for PII/sensitive data. 

192 

193 Reuses security/dlp_engine.py check_outbound(). 

194 

195 Returns: 

196 (allowed, reason) 

197 """ 

198 try: 

199 from security.dlp_engine import get_dlp_engine 

200 dlp = get_dlp_engine() 

201 

202 # Scan filename 

203 allowed, reason = dlp.check_outbound(filename) 

204 if not allowed: 

205 return False, f"Filename blocked: {reason}" 

206 

207 # Scan content preview if available 

208 if content_preview: 

209 allowed, reason = dlp.check_outbound(content_preview) 

210 if not allowed: 

211 return False, f"Content blocked: {reason}" 

212 

213 return True, '' 

214 except Exception as e: 

215 logger.debug(f"DLP engine unavailable: {e}") 

216 return True, 'dlp_unavailable' 

217 

218 

219def scan_clipboard(text: str) -> Tuple[bool, str]: 

220 """Scan clipboard content before sending to remote device. 

221 

222 Returns: 

223 (allowed, reason) 

224 """ 

225 try: 

226 from security.dlp_engine import get_dlp_engine 

227 dlp = get_dlp_engine() 

228 return dlp.check_outbound(text) 

229 except Exception: 

230 return True, '' 

231 

232 

233# ── E2E Encryption ────────────────────────────────────────────── 

234 

235def encrypt_frame(frame_bytes: bytes, 

236 peer_x25519_public_hex: str) -> Optional[Dict[str, str]]: 

237 """Encrypt frame data for peer using X25519+AES-256-GCM. 

238 

239 Reuses security/channel_encryption.py encrypt_for_peer(). 

240 

241 Returns: 

242 Envelope dict {eph, nonce, ct, v} or None if encryption unavailable. 

243 """ 

244 try: 

245 from security.channel_encryption import encrypt_for_peer 

246 return encrypt_for_peer(frame_bytes, peer_x25519_public_hex) 

247 except Exception as e: 

248 logger.debug(f"Frame encryption failed: {e}") 

249 return None 

250 

251 

252def decrypt_frame(envelope: Dict[str, str]) -> Optional[bytes]: 

253 """Decrypt frame envelope from peer. 

254 

255 Returns: 

256 Decrypted bytes or None. 

257 """ 

258 try: 

259 from security.channel_encryption import decrypt_from_peer 

260 return decrypt_from_peer(envelope) 

261 except Exception as e: 

262 logger.debug(f"Frame decryption failed: {e}") 

263 return None 

264 

265 

266def encrypt_event(event: dict, 

267 peer_x25519_public_hex: str) -> Optional[Dict[str, str]]: 

268 """Encrypt JSON event (input/clipboard/control) for peer. 

269 

270 Reuses security/channel_encryption.py encrypt_json_for_peer(). 

271 """ 

272 try: 

273 from security.channel_encryption import encrypt_json_for_peer 

274 return encrypt_json_for_peer(event, peer_x25519_public_hex) 

275 except Exception as e: 

276 logger.debug(f"Event encryption failed: {e}") 

277 return None 

278 

279 

280def decrypt_event(envelope: Dict[str, str]) -> Optional[dict]: 

281 """Decrypt JSON event envelope from peer.""" 

282 try: 

283 from security.channel_encryption import decrypt_json_from_peer 

284 return decrypt_json_from_peer(envelope) 

285 except Exception as e: 

286 logger.debug(f"Event decryption failed: {e}") 

287 return None