Coverage for integrations / remote_desktop / file_transfer.py: 90.9%

154 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2File Transfer — Chunked binary transfer over any transport tier. 

3 

4Protocol: 

5 1. Sender: FILE_START {name, size, sha256} → binary chunks (64KB) → FILE_END 

6 2. Receiver: accumulate chunks → verify SHA256 → FILE_ACK 

7 

8DLP scan before sending via security/dlp_engine.py. 

9Works over DirectWebSocket, WAMP relay, or WireGuard transport. 

10 

11Reuses: 

12 - transport.TransportChannel.send_event() for control messages 

13 - transport.TransportChannel.send_frame() for binary chunks 

14 - security.scan_file_transfer() for DLP scanning 

15""" 

16 

17import hashlib 

18import logging 

19import os 

20import time 

21from dataclasses import dataclass, field 

22from enum import Enum 

23from typing import Callable, Dict, List, Optional 

24 

25logger = logging.getLogger('hevolve.remote_desktop') 

26 

27 

28class FileTransferState(Enum): 

29 IDLE = 'idle' 

30 SENDING = 'sending' 

31 RECEIVING = 'receiving' 

32 COMPLETED = 'completed' 

33 ERROR = 'error' 

34 

35 

36@dataclass 

37class TransferProgress: 

38 """Progress tracking for a file transfer.""" 

39 filename: str = '' 

40 total_bytes: int = 0 

41 transferred_bytes: int = 0 

42 chunks_sent: int = 0 

43 chunks_received: int = 0 

44 state: FileTransferState = FileTransferState.IDLE 

45 error: Optional[str] = None 

46 started_at: Optional[float] = None 

47 completed_at: Optional[float] = None 

48 sha256: Optional[str] = None 

49 

50 @property 

51 def percent(self) -> float: 

52 if self.total_bytes == 0: 

53 return 0.0 

54 return min(100.0, (self.transferred_bytes / self.total_bytes) * 100) 

55 

56 def to_dict(self) -> dict: 

57 return { 

58 'filename': self.filename, 

59 'total_bytes': self.total_bytes, 

60 'transferred_bytes': self.transferred_bytes, 

61 'percent': round(self.percent, 1), 

62 'state': self.state.value, 

63 'error': self.error, 

64 'sha256': self.sha256, 

65 } 

66 

67 

68class FileTransfer: 

69 """Chunked binary file transfer over transport channel. 

70 

71 Sends files as a sequence of control messages + binary chunks. 

72 Works over any TransportChannel implementation. 

73 """ 

74 

75 CHUNK_SIZE = 65536 # 64KB per chunk 

76 

77 def __init__(self): 

78 self._progress: Optional[TransferProgress] = None 

79 self._on_progress: Optional[Callable[[TransferProgress], None]] = None 

80 self._receive_buffer: bytearray = bytearray() 

81 self._receive_info: Optional[dict] = None 

82 self._save_dir: str = '.' 

83 

84 def on_progress(self, callback: Callable[[TransferProgress], None]) -> None: 

85 """Register progress callback.""" 

86 self._on_progress = callback 

87 

88 def send_file(self, transport, local_path: str) -> dict: 

89 """Send a file over a transport channel. 

90 

91 Args: 

92 transport: TransportChannel instance 

93 local_path: Path to file to send 

94 

95 Returns: 

96 {success, sha256, bytes_sent, chunks, filename} 

97 """ 

98 if not os.path.exists(local_path): 

99 return {'success': False, 'error': f'File not found: {local_path}'} 

100 

101 filename = os.path.basename(local_path) 

102 file_size = os.path.getsize(local_path) 

103 

104 # DLP scan 

105 try: 

106 from integrations.remote_desktop.security import scan_file_transfer 

107 allowed, reason = scan_file_transfer(filename) 

108 if not allowed: 

109 return {'success': False, 'error': f'DLP blocked: {reason}'} 

110 except Exception: 

111 pass 

112 

113 # Initialize progress 

114 self._progress = TransferProgress( 

115 filename=filename, 

116 total_bytes=file_size, 

117 state=FileTransferState.SENDING, 

118 started_at=time.time(), 

119 ) 

120 

121 # Compute SHA256 

122 sha256 = hashlib.sha256() 

123 with open(local_path, 'rb') as f: 

124 while True: 

125 chunk = f.read(self.CHUNK_SIZE) 

126 if not chunk: 

127 break 

128 sha256.update(chunk) 

129 file_hash = sha256.hexdigest() 

130 self._progress.sha256 = file_hash 

131 

132 # Send FILE_START control message 

133 ok = transport.send_event({ 

134 'type': 'file_ctrl', 

135 'action': 'FILE_START', 

136 'filename': filename, 

137 'size': file_size, 

138 'sha256': file_hash, 

139 }) 

140 if not ok: 

141 self._progress.state = FileTransferState.ERROR 

142 self._progress.error = 'Failed to send FILE_START' 

143 return {'success': False, 'error': 'Transport send failed'} 

144 

145 # Send chunks 

146 chunks_sent = 0 

147 bytes_sent = 0 

148 with open(local_path, 'rb') as f: 

149 while True: 

150 chunk = f.read(self.CHUNK_SIZE) 

151 if not chunk: 

152 break 

153 

154 ok = transport.send_frame(chunk) 

155 if not ok: 

156 self._progress.state = FileTransferState.ERROR 

157 self._progress.error = f'Chunk {chunks_sent} send failed' 

158 return { 

159 'success': False, 

160 'error': f'Failed at chunk {chunks_sent}', 

161 'bytes_sent': bytes_sent, 

162 } 

163 

164 chunks_sent += 1 

165 bytes_sent += len(chunk) 

166 self._progress.chunks_sent = chunks_sent 

167 self._progress.transferred_bytes = bytes_sent 

168 self._notify_progress() 

169 

170 # Send FILE_END 

171 transport.send_event({ 

172 'type': 'file_ctrl', 

173 'action': 'FILE_END', 

174 'filename': filename, 

175 'sha256': file_hash, 

176 }) 

177 

178 self._progress.state = FileTransferState.COMPLETED 

179 self._progress.completed_at = time.time() 

180 self._notify_progress() 

181 

182 logger.info(f"File sent: {filename} ({bytes_sent} bytes, {chunks_sent} chunks)") 

183 return { 

184 'success': True, 

185 'filename': filename, 

186 'sha256': file_hash, 

187 'bytes_sent': bytes_sent, 

188 'chunks': chunks_sent, 

189 } 

190 

191 def send_files(self, transport, local_paths: list) -> list: 

192 """Send multiple files sequentially (for drag-and-drop batch transfers). 

193 

194 Args: 

195 transport: TransportChannel to send on. 

196 local_paths: List of local file paths to transfer. 

197 

198 Returns: 

199 List of result dicts (one per file). 

200 """ 

201 results = [] 

202 for path in local_paths: 

203 result = self.send_file(transport, path) 

204 results.append(result) 

205 return results 

206 

207 def receive_file(self, save_dir: str = '.') -> dict: 

208 """Prepare to receive a file. 

209 

210 Call this before starting the transport event loop. 

211 The actual receiving happens via handle_event() and handle_frame(). 

212 

213 Returns: 

214 Setup status dict. 

215 """ 

216 self._save_dir = save_dir 

217 self._receive_buffer = bytearray() 

218 self._receive_info = None 

219 self._progress = TransferProgress(state=FileTransferState.RECEIVING) 

220 

221 os.makedirs(save_dir, exist_ok=True) 

222 return {'status': 'ready', 'save_dir': save_dir} 

223 

224 def handle_event(self, event: dict) -> Optional[dict]: 

225 """Handle a file transfer control event. 

226 

227 Called by the transport event callback. 

228 Returns result dict on FILE_END, None otherwise. 

229 """ 

230 if event.get('type') != 'file_ctrl': 

231 return None 

232 

233 action = event.get('action', '') 

234 

235 if action == 'FILE_START': 

236 self._receive_info = { 

237 'filename': event.get('filename', 'unnamed'), 

238 'size': event.get('size', 0), 

239 'sha256': event.get('sha256', ''), 

240 } 

241 self._receive_buffer = bytearray() 

242 self._progress = TransferProgress( 

243 filename=self._receive_info['filename'], 

244 total_bytes=self._receive_info['size'], 

245 state=FileTransferState.RECEIVING, 

246 started_at=time.time(), 

247 ) 

248 logger.info(f"Receiving file: {self._receive_info['filename']} " 

249 f"({self._receive_info['size']} bytes)") 

250 return None 

251 

252 if action == 'FILE_END': 

253 return self._finalize_receive(event) 

254 

255 return None 

256 

257 def handle_frame(self, data: bytes) -> None: 

258 """Handle a binary chunk during file receive. 

259 

260 Called by the transport frame callback during file transfer. 

261 """ 

262 if self._progress and self._progress.state == FileTransferState.RECEIVING: 

263 self._receive_buffer.extend(data) 

264 self._progress.chunks_received += 1 

265 self._progress.transferred_bytes = len(self._receive_buffer) 

266 self._notify_progress() 

267 

268 def _finalize_receive(self, event: dict) -> dict: 

269 """Finalize file receive — verify SHA256 and save.""" 

270 if not self._receive_info: 

271 return {'success': False, 'error': 'No FILE_START received'} 

272 

273 filename = self._receive_info['filename'] 

274 expected_hash = self._receive_info.get('sha256', '') 

275 

276 # Verify SHA256 

277 actual_hash = hashlib.sha256(self._receive_buffer).hexdigest() 

278 if expected_hash and actual_hash != expected_hash: 

279 self._progress.state = FileTransferState.ERROR 

280 self._progress.error = 'SHA256 mismatch' 

281 return { 

282 'success': False, 

283 'error': 'SHA256 mismatch (file corrupted)', 

284 'expected': expected_hash, 

285 'actual': actual_hash, 

286 } 

287 

288 # Save file — basename() prevents directory traversal from sender 

289 save_path = os.path.join(self._save_dir, os.path.basename(filename)) 

290 try: 

291 with open(save_path, 'wb') as f: 

292 f.write(self._receive_buffer) 

293 except Exception as e: 

294 self._progress.state = FileTransferState.ERROR 

295 self._progress.error = str(e) 

296 return {'success': False, 'error': f'Save failed: {e}'} 

297 

298 self._progress.state = FileTransferState.COMPLETED 

299 self._progress.completed_at = time.time() 

300 self._progress.sha256 = actual_hash 

301 self._notify_progress() 

302 

303 logger.info(f"File received: {filename} → {save_path} " 

304 f"({len(self._receive_buffer)} bytes)") 

305 

306 # Clean up 

307 received_bytes = len(self._receive_buffer) 

308 self._receive_buffer = bytearray() 

309 self._receive_info = None 

310 

311 return { 

312 'success': True, 

313 'path': save_path, 

314 'filename': filename, 

315 'sha256': actual_hash, 

316 'bytes_received': received_bytes, 

317 } 

318 

319 def _notify_progress(self) -> None: 

320 """Notify progress callback.""" 

321 if self._on_progress and self._progress: 

322 try: 

323 self._on_progress(self._progress) 

324 except Exception: 

325 pass 

326 

327 def get_progress(self) -> Optional[dict]: 

328 """Get current transfer progress.""" 

329 if self._progress: 

330 return self._progress.to_dict() 

331 return None