Coverage for integrations / remote_desktop / file_transfer.py: 90.9%
154 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2File Transfer — Chunked binary transfer over any transport tier.
4Protocol:
5 1. Sender: FILE_START {name, size, sha256} → binary chunks (64KB) → FILE_END
6 2. Receiver: accumulate chunks → verify SHA256 → FILE_ACK
8DLP scan before sending via security/dlp_engine.py.
9Works over DirectWebSocket, WAMP relay, or WireGuard transport.
11Reuses:
12 - transport.TransportChannel.send_event() for control messages
13 - transport.TransportChannel.send_frame() for binary chunks
14 - security.scan_file_transfer() for DLP scanning
15"""
17import hashlib
18import logging
19import os
20import time
21from dataclasses import dataclass, field
22from enum import Enum
23from typing import Callable, Dict, List, Optional
25logger = logging.getLogger('hevolve.remote_desktop')
28class FileTransferState(Enum):
29 IDLE = 'idle'
30 SENDING = 'sending'
31 RECEIVING = 'receiving'
32 COMPLETED = 'completed'
33 ERROR = 'error'
36@dataclass
37class TransferProgress:
38 """Progress tracking for a file transfer."""
39 filename: str = ''
40 total_bytes: int = 0
41 transferred_bytes: int = 0
42 chunks_sent: int = 0
43 chunks_received: int = 0
44 state: FileTransferState = FileTransferState.IDLE
45 error: Optional[str] = None
46 started_at: Optional[float] = None
47 completed_at: Optional[float] = None
48 sha256: Optional[str] = None
50 @property
51 def percent(self) -> float:
52 if self.total_bytes == 0:
53 return 0.0
54 return min(100.0, (self.transferred_bytes / self.total_bytes) * 100)
56 def to_dict(self) -> dict:
57 return {
58 'filename': self.filename,
59 'total_bytes': self.total_bytes,
60 'transferred_bytes': self.transferred_bytes,
61 'percent': round(self.percent, 1),
62 'state': self.state.value,
63 'error': self.error,
64 'sha256': self.sha256,
65 }
68class FileTransfer:
69 """Chunked binary file transfer over transport channel.
71 Sends files as a sequence of control messages + binary chunks.
72 Works over any TransportChannel implementation.
73 """
75 CHUNK_SIZE = 65536 # 64KB per chunk
77 def __init__(self):
78 self._progress: Optional[TransferProgress] = None
79 self._on_progress: Optional[Callable[[TransferProgress], None]] = None
80 self._receive_buffer: bytearray = bytearray()
81 self._receive_info: Optional[dict] = None
82 self._save_dir: str = '.'
84 def on_progress(self, callback: Callable[[TransferProgress], None]) -> None:
85 """Register progress callback."""
86 self._on_progress = callback
88 def send_file(self, transport, local_path: str) -> dict:
89 """Send a file over a transport channel.
91 Args:
92 transport: TransportChannel instance
93 local_path: Path to file to send
95 Returns:
96 {success, sha256, bytes_sent, chunks, filename}
97 """
98 if not os.path.exists(local_path):
99 return {'success': False, 'error': f'File not found: {local_path}'}
101 filename = os.path.basename(local_path)
102 file_size = os.path.getsize(local_path)
104 # DLP scan
105 try:
106 from integrations.remote_desktop.security import scan_file_transfer
107 allowed, reason = scan_file_transfer(filename)
108 if not allowed:
109 return {'success': False, 'error': f'DLP blocked: {reason}'}
110 except Exception:
111 pass
113 # Initialize progress
114 self._progress = TransferProgress(
115 filename=filename,
116 total_bytes=file_size,
117 state=FileTransferState.SENDING,
118 started_at=time.time(),
119 )
121 # Compute SHA256
122 sha256 = hashlib.sha256()
123 with open(local_path, 'rb') as f:
124 while True:
125 chunk = f.read(self.CHUNK_SIZE)
126 if not chunk:
127 break
128 sha256.update(chunk)
129 file_hash = sha256.hexdigest()
130 self._progress.sha256 = file_hash
132 # Send FILE_START control message
133 ok = transport.send_event({
134 'type': 'file_ctrl',
135 'action': 'FILE_START',
136 'filename': filename,
137 'size': file_size,
138 'sha256': file_hash,
139 })
140 if not ok:
141 self._progress.state = FileTransferState.ERROR
142 self._progress.error = 'Failed to send FILE_START'
143 return {'success': False, 'error': 'Transport send failed'}
145 # Send chunks
146 chunks_sent = 0
147 bytes_sent = 0
148 with open(local_path, 'rb') as f:
149 while True:
150 chunk = f.read(self.CHUNK_SIZE)
151 if not chunk:
152 break
154 ok = transport.send_frame(chunk)
155 if not ok:
156 self._progress.state = FileTransferState.ERROR
157 self._progress.error = f'Chunk {chunks_sent} send failed'
158 return {
159 'success': False,
160 'error': f'Failed at chunk {chunks_sent}',
161 'bytes_sent': bytes_sent,
162 }
164 chunks_sent += 1
165 bytes_sent += len(chunk)
166 self._progress.chunks_sent = chunks_sent
167 self._progress.transferred_bytes = bytes_sent
168 self._notify_progress()
170 # Send FILE_END
171 transport.send_event({
172 'type': 'file_ctrl',
173 'action': 'FILE_END',
174 'filename': filename,
175 'sha256': file_hash,
176 })
178 self._progress.state = FileTransferState.COMPLETED
179 self._progress.completed_at = time.time()
180 self._notify_progress()
182 logger.info(f"File sent: {filename} ({bytes_sent} bytes, {chunks_sent} chunks)")
183 return {
184 'success': True,
185 'filename': filename,
186 'sha256': file_hash,
187 'bytes_sent': bytes_sent,
188 'chunks': chunks_sent,
189 }
191 def send_files(self, transport, local_paths: list) -> list:
192 """Send multiple files sequentially (for drag-and-drop batch transfers).
194 Args:
195 transport: TransportChannel to send on.
196 local_paths: List of local file paths to transfer.
198 Returns:
199 List of result dicts (one per file).
200 """
201 results = []
202 for path in local_paths:
203 result = self.send_file(transport, path)
204 results.append(result)
205 return results
207 def receive_file(self, save_dir: str = '.') -> dict:
208 """Prepare to receive a file.
210 Call this before starting the transport event loop.
211 The actual receiving happens via handle_event() and handle_frame().
213 Returns:
214 Setup status dict.
215 """
216 self._save_dir = save_dir
217 self._receive_buffer = bytearray()
218 self._receive_info = None
219 self._progress = TransferProgress(state=FileTransferState.RECEIVING)
221 os.makedirs(save_dir, exist_ok=True)
222 return {'status': 'ready', 'save_dir': save_dir}
224 def handle_event(self, event: dict) -> Optional[dict]:
225 """Handle a file transfer control event.
227 Called by the transport event callback.
228 Returns result dict on FILE_END, None otherwise.
229 """
230 if event.get('type') != 'file_ctrl':
231 return None
233 action = event.get('action', '')
235 if action == 'FILE_START':
236 self._receive_info = {
237 'filename': event.get('filename', 'unnamed'),
238 'size': event.get('size', 0),
239 'sha256': event.get('sha256', ''),
240 }
241 self._receive_buffer = bytearray()
242 self._progress = TransferProgress(
243 filename=self._receive_info['filename'],
244 total_bytes=self._receive_info['size'],
245 state=FileTransferState.RECEIVING,
246 started_at=time.time(),
247 )
248 logger.info(f"Receiving file: {self._receive_info['filename']} "
249 f"({self._receive_info['size']} bytes)")
250 return None
252 if action == 'FILE_END':
253 return self._finalize_receive(event)
255 return None
257 def handle_frame(self, data: bytes) -> None:
258 """Handle a binary chunk during file receive.
260 Called by the transport frame callback during file transfer.
261 """
262 if self._progress and self._progress.state == FileTransferState.RECEIVING:
263 self._receive_buffer.extend(data)
264 self._progress.chunks_received += 1
265 self._progress.transferred_bytes = len(self._receive_buffer)
266 self._notify_progress()
268 def _finalize_receive(self, event: dict) -> dict:
269 """Finalize file receive — verify SHA256 and save."""
270 if not self._receive_info:
271 return {'success': False, 'error': 'No FILE_START received'}
273 filename = self._receive_info['filename']
274 expected_hash = self._receive_info.get('sha256', '')
276 # Verify SHA256
277 actual_hash = hashlib.sha256(self._receive_buffer).hexdigest()
278 if expected_hash and actual_hash != expected_hash:
279 self._progress.state = FileTransferState.ERROR
280 self._progress.error = 'SHA256 mismatch'
281 return {
282 'success': False,
283 'error': 'SHA256 mismatch (file corrupted)',
284 'expected': expected_hash,
285 'actual': actual_hash,
286 }
288 # Save file — basename() prevents directory traversal from sender
289 save_path = os.path.join(self._save_dir, os.path.basename(filename))
290 try:
291 with open(save_path, 'wb') as f:
292 f.write(self._receive_buffer)
293 except Exception as e:
294 self._progress.state = FileTransferState.ERROR
295 self._progress.error = str(e)
296 return {'success': False, 'error': f'Save failed: {e}'}
298 self._progress.state = FileTransferState.COMPLETED
299 self._progress.completed_at = time.time()
300 self._progress.sha256 = actual_hash
301 self._notify_progress()
303 logger.info(f"File received: {filename} → {save_path} "
304 f"({len(self._receive_buffer)} bytes)")
306 # Clean up
307 received_bytes = len(self._receive_buffer)
308 self._receive_buffer = bytearray()
309 self._receive_info = None
311 return {
312 'success': True,
313 'path': save_path,
314 'filename': filename,
315 'sha256': actual_hash,
316 'bytes_received': received_bytes,
317 }
319 def _notify_progress(self) -> None:
320 """Notify progress callback."""
321 if self._on_progress and self._progress:
322 try:
323 self._on_progress(self._progress)
324 except Exception:
325 pass
327 def get_progress(self) -> Optional[dict]:
328 """Get current transfer progress."""
329 if self._progress:
330 return self._progress.to_dict()
331 return None