Coverage for integrations / remote_desktop / security.py: 72.6%
73 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Remote Desktop Security — Auth, audit, DLP, input classification, E2E encryption.
4Reuses existing HARTOS security infrastructure:
5 - security/channel_encryption.py → encrypt_for_peer() for E2E frame encryption
6 - security/immutable_audit_log.py → log_event() for session audit trail
7 - security/action_classifier.py → classify_action() for destructive input gating
8 - security/dlp_engine.py → check_outbound() for file transfer DLP
9 - security/rate_limiter_redis.py → rate_limit() for connection attempt limiting
10 - integrations/social/auth.py → generate_token_pair() for JWT session tokens
11 - integrations/social/services.py → NotificationService.create() for connection requests
12"""
14import logging
15import time
16from typing import Any, Dict, Optional, Tuple
18logger = logging.getLogger('hevolve.remote_desktop')
21# ── Authentication ──────────────────────────────────────────────
23def authenticate_connection(host_device_id: str, viewer_device_id: str,
24 password: str,
25 host_user_id: Optional[str] = None,
26 viewer_user_id: Optional[str] = None
27 ) -> Tuple[bool, str]:
28 """Authenticate a remote desktop connection.
30 Same-user: auto-accept (no OTP needed) — compute_mesh_service.py:398 pattern.
31 Cross-user: OTP + explicit consent.
33 Returns:
34 (success, reason)
35 """
36 from integrations.remote_desktop.session_manager import get_session_manager
38 sm = get_session_manager()
40 # Same-user auto-accept
41 if sm.is_same_user(host_user_id, viewer_user_id):
42 logger.info(
43 f"Same-user auto-accept: host={host_device_id[:8]}, "
44 f"viewer={viewer_device_id[:8]}"
45 )
46 return True, 'same_user_auto_accept'
48 # Cross-user: verify OTP
49 if not password:
50 return False, 'password_required'
52 if sm.verify_otp(host_device_id, password):
53 return True, 'otp_verified'
55 return False, 'invalid_password'
58def generate_session_token(session_id: str, device_id: str,
59 user_id: Optional[str] = None) -> Optional[str]:
60 """Generate JWT token for persistent remote desktop session.
62 Reuses integrations/social/auth.py:127 generate_token_pair().
63 Falls back to simple token if auth module unavailable.
64 """
65 try:
66 from integrations.social.auth import generate_token_pair
67 token_pair = generate_token_pair(
68 user_id=int(user_id) if user_id and user_id.isdigit() else 0,
69 extra_claims={
70 'session_id': session_id,
71 'device_id': device_id,
72 'type': 'remote_desktop',
73 },
74 )
75 return token_pair.get('access_token')
76 except Exception as e:
77 logger.debug(f"JWT auth unavailable, using simple token: {e}")
78 import secrets
79 return secrets.token_urlsafe(32)
82# ── Notifications ───────────────────────────────────────────────
84def notify_connection_request(source_user_id: str, target_user_id: str,
85 source_device_id: str) -> bool:
86 """Send notification to target user about incoming connection request.
88 Reuses NotificationService.create() from integrations/social/services.py.
89 """
90 try:
91 from integrations.social.services import NotificationService
92 from integrations.social.models import db_session
93 with db_session() as db:
94 NotificationService.create(
95 db=db,
96 recipient_id=int(target_user_id),
97 sender_id=int(source_user_id) if source_user_id.isdigit() else None,
98 notification_type='remote_desktop_request',
99 content=f"Remote desktop connection request from device {source_device_id[:8]}...",
100 data={
101 'source_device_id': source_device_id,
102 'source_user_id': source_user_id,
103 'action': 'remote_desktop_connect',
104 },
105 )
106 db.commit()
107 logger.info(
108 f"Connection request notification sent: "
109 f"user {source_user_id} → user {target_user_id}"
110 )
111 return True
112 except Exception as e:
113 logger.warning(f"Notification failed (service unavailable): {e}")
114 return False
117# ── Audit Logging ───────────────────────────────────────────────
119def audit_session_event(event_type: str, session_id: str,
120 actor_id: str, detail: Optional[Dict] = None
121 ) -> Optional[Tuple[int, str]]:
122 """Log remote desktop event to immutable audit log.
124 Reuses security/immutable_audit_log.py log_event().
125 """
126 try:
127 from security.immutable_audit_log import get_audit_log
128 audit = get_audit_log()
129 return audit.log_event(
130 event_type=f'remote_desktop.{event_type}',
131 actor_id=actor_id,
132 action=f'remote_desktop_{event_type}',
133 detail={**(detail or {}), 'session_id': session_id},
134 target_id=session_id,
135 )
136 except Exception as e:
137 logger.debug(f"Audit log unavailable: {e}")
138 return None
141# ── Input Classification ────────────────────────────────────────
143def classify_remote_input(action: dict) -> str:
144 """Classify remote input action for safety gating.
146 Reuses security/action_classifier.py classify_action().
147 Destructive actions (Alt+F4, Ctrl+Alt+Del, etc.) require EXPERT tier.
149 Returns:
150 'safe', 'destructive', or 'unknown'
151 """
152 # Build text description of the action for the classifier
153 action_type = action.get('type', '')
154 key = action.get('key', '')
155 text = action.get('text', '')
156 hotkey = action.get('hotkey', '')
158 # Check for known destructive keyboard shortcuts
159 DESTRUCTIVE_HOTKEYS = {
160 'alt+f4', 'ctrl+alt+delete', 'ctrl+alt+del',
161 'super+l', 'ctrl+shift+escape', 'alt+tab',
162 }
163 if hotkey and hotkey.lower() in DESTRUCTIVE_HOTKEYS:
164 return 'destructive'
166 # Mouse events are inherently safe (click, move, scroll, drag)
167 SAFE_INPUT_TYPES = {'click', 'rightclick', 'doubleclick', 'middleclick',
168 'move', 'drag', 'scroll', 'mouse_move', 'mouse_down',
169 'mouse_up', 'cursor'}
170 if action_type in SAFE_INPUT_TYPES:
171 return 'safe'
173 # Delegate to action_classifier for text/key-based classification
174 try:
175 from security.action_classifier import classify_action
176 action_text = f"{action_type} {key} {text} {hotkey}".strip()
177 if action_text:
178 return classify_action(action_text)
179 except Exception:
180 pass
182 # If classifier unavailable, default to unknown for key events, safe for others
183 return 'unknown' if hotkey or action_type == 'key' else 'safe'
186# ── DLP (Data Loss Prevention) ──────────────────────────────────
188def scan_file_transfer(filename: str,
189 content_preview: Optional[str] = None
190 ) -> Tuple[bool, str]:
191 """Scan file transfer for PII/sensitive data.
193 Reuses security/dlp_engine.py check_outbound().
195 Returns:
196 (allowed, reason)
197 """
198 try:
199 from security.dlp_engine import get_dlp_engine
200 dlp = get_dlp_engine()
202 # Scan filename
203 allowed, reason = dlp.check_outbound(filename)
204 if not allowed:
205 return False, f"Filename blocked: {reason}"
207 # Scan content preview if available
208 if content_preview:
209 allowed, reason = dlp.check_outbound(content_preview)
210 if not allowed:
211 return False, f"Content blocked: {reason}"
213 return True, ''
214 except Exception as e:
215 logger.debug(f"DLP engine unavailable: {e}")
216 return True, 'dlp_unavailable'
219def scan_clipboard(text: str) -> Tuple[bool, str]:
220 """Scan clipboard content before sending to remote device.
222 Returns:
223 (allowed, reason)
224 """
225 try:
226 from security.dlp_engine import get_dlp_engine
227 dlp = get_dlp_engine()
228 return dlp.check_outbound(text)
229 except Exception:
230 return True, ''
233# ── E2E Encryption ──────────────────────────────────────────────
235def encrypt_frame(frame_bytes: bytes,
236 peer_x25519_public_hex: str) -> Optional[Dict[str, str]]:
237 """Encrypt frame data for peer using X25519+AES-256-GCM.
239 Reuses security/channel_encryption.py encrypt_for_peer().
241 Returns:
242 Envelope dict {eph, nonce, ct, v} or None if encryption unavailable.
243 """
244 try:
245 from security.channel_encryption import encrypt_for_peer
246 return encrypt_for_peer(frame_bytes, peer_x25519_public_hex)
247 except Exception as e:
248 logger.debug(f"Frame encryption failed: {e}")
249 return None
252def decrypt_frame(envelope: Dict[str, str]) -> Optional[bytes]:
253 """Decrypt frame envelope from peer.
255 Returns:
256 Decrypted bytes or None.
257 """
258 try:
259 from security.channel_encryption import decrypt_from_peer
260 return decrypt_from_peer(envelope)
261 except Exception as e:
262 logger.debug(f"Frame decryption failed: {e}")
263 return None
266def encrypt_event(event: dict,
267 peer_x25519_public_hex: str) -> Optional[Dict[str, str]]:
268 """Encrypt JSON event (input/clipboard/control) for peer.
270 Reuses security/channel_encryption.py encrypt_json_for_peer().
271 """
272 try:
273 from security.channel_encryption import encrypt_json_for_peer
274 return encrypt_json_for_peer(event, peer_x25519_public_hex)
275 except Exception as e:
276 logger.debug(f"Event encryption failed: {e}")
277 return None
280def decrypt_event(envelope: Dict[str, str]) -> Optional[dict]:
281 """Decrypt JSON event envelope from peer."""
282 try:
283 from security.channel_encryption import decrypt_json_from_peer
284 return decrypt_json_from_peer(envelope)
285 except Exception as e:
286 logger.debug(f"Event decryption failed: {e}")
287 return None