Coverage for integrations / channels / media / files.py: 64.4%

253 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2File Manager for file handling operations. 

3 

4Provides download, upload, and file management functionality. 

5""" 

6 

7import asyncio 

8import os 

9import hashlib 

10import mimetypes 

11import time 

12from dataclasses import dataclass, field 

13from enum import Enum 

14from typing import Optional, List, Dict, Any, Union 

15from pathlib import Path 

16from urllib.parse import urlparse, unquote 

17import logging 

18 

19logger = logging.getLogger(__name__) 

20 

21# Docker-compatible paths 

22TEMP_DIR = os.environ.get("FILE_TEMP_DIR", "/tmp/files") 

23APP_TEMP_DIR = os.environ.get("APP_TEMP_DIR", "/app/temp") 

24UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/app/uploads") 

25 

26 

27class FileStatus(Enum): 

28 """File operation status.""" 

29 PENDING = "pending" 

30 DOWNLOADING = "downloading" 

31 UPLOADING = "uploading" 

32 COMPLETED = "completed" 

33 FAILED = "failed" 

34 EXPIRED = "expired" 

35 

36 

37class StorageBackend(Enum): 

38 """Storage backend types.""" 

39 LOCAL = "local" 

40 S3 = "s3" 

41 GCS = "gcs" # Google Cloud Storage 

42 AZURE = "azure" 

43 

44 

45@dataclass 

46class FileInfo: 

47 """Information about a file.""" 

48 file_id: str 

49 filename: str 

50 size: int 

51 mime_type: str 

52 url: Optional[str] = None 

53 local_path: Optional[str] = None 

54 channel: Optional[str] = None 

55 created_at: float = field(default_factory=time.time) 

56 expires_at: Optional[float] = None 

57 checksum: Optional[str] = None 

58 metadata: Dict[str, Any] = field(default_factory=dict) 

59 

60 def to_dict(self) -> Dict[str, Any]: 

61 return { 

62 "file_id": self.file_id, 

63 "filename": self.filename, 

64 "size": self.size, 

65 "mime_type": self.mime_type, 

66 "url": self.url, 

67 "local_path": self.local_path, 

68 "channel": self.channel, 

69 "created_at": self.created_at, 

70 "expires_at": self.expires_at, 

71 "checksum": self.checksum, 

72 "metadata": self.metadata 

73 } 

74 

75 def is_expired(self) -> bool: 

76 """Check if file has expired.""" 

77 if self.expires_at is None: 

78 return False 

79 return time.time() > self.expires_at 

80 

81 def get_extension(self) -> str: 

82 """Get file extension.""" 

83 if "." in self.filename: 

84 return self.filename.rsplit(".", 1)[-1].lower() 

85 return "" 

86 

87 

88@dataclass 

89class DownloadResult: 

90 """Result of a download operation.""" 

91 success: bool 

92 file_path: Optional[str] = None 

93 file_info: Optional[FileInfo] = None 

94 error: Optional[str] = None 

95 download_time: float = 0.0 

96 

97 def to_dict(self) -> Dict[str, Any]: 

98 return { 

99 "success": self.success, 

100 "file_path": self.file_path, 

101 "file_info": self.file_info.to_dict() if self.file_info else None, 

102 "error": self.error, 

103 "download_time": self.download_time 

104 } 

105 

106 

107@dataclass 

108class UploadResult: 

109 """Result of an upload operation.""" 

110 success: bool 

111 url: Optional[str] = None 

112 file_id: Optional[str] = None 

113 file_info: Optional[FileInfo] = None 

114 error: Optional[str] = None 

115 upload_time: float = 0.0 

116 

117 def to_dict(self) -> Dict[str, Any]: 

118 return { 

119 "success": self.success, 

120 "url": self.url, 

121 "file_id": self.file_id, 

122 "file_info": self.file_info.to_dict() if self.file_info else None, 

123 "error": self.error, 

124 "upload_time": self.upload_time 

125 } 

126 

127 

128class FileManager: 

129 """ 

130 File manager for handling file operations. 

131 

132 Provides download, upload, and file management across channels. 

133 """ 

134 

135 # Maximum file sizes per channel (bytes) 

136 CHANNEL_MAX_SIZES = { 

137 "telegram": 50 * 1024 * 1024, # 50MB 

138 "discord": 8 * 1024 * 1024, # 8MB (without Nitro) 

139 "slack": 1 * 1024 * 1024 * 1024, # 1GB 

140 "whatsapp": 16 * 1024 * 1024, # 16MB 

141 "default": 25 * 1024 * 1024 # 25MB 

142 } 

143 

144 # Allowed file extensions per channel 

145 CHANNEL_ALLOWED_EXTENSIONS = { 

146 "telegram": ["jpg", "jpeg", "png", "gif", "webp", "mp4", "mp3", "pdf", "doc", "docx", "zip"], 

147 "discord": ["jpg", "jpeg", "png", "gif", "webp", "mp4", "mp3", "wav", "pdf", "txt"], 

148 "slack": ["jpg", "jpeg", "png", "gif", "pdf", "doc", "docx", "xls", "xlsx", "ppt", "pptx", "zip"], 

149 "whatsapp": ["jpg", "jpeg", "png", "gif", "mp4", "mp3", "pdf", "doc", "docx"], 

150 "default": ["jpg", "jpeg", "png", "gif", "pdf", "txt"] 

151 } 

152 

153 def __init__( 

154 self, 

155 storage_backend: Union[StorageBackend, str] = StorageBackend.LOCAL, 

156 temp_dir: Optional[str] = None, 

157 upload_dir: Optional[str] = None, 

158 config: Optional[Dict[str, Any]] = None 

159 ): 

160 """ 

161 Initialize file manager. 

162 

163 Args: 

164 storage_backend: Storage backend to use 

165 temp_dir: Temporary directory for downloads 

166 upload_dir: Directory for uploads 

167 config: Additional configuration options 

168 """ 

169 if isinstance(storage_backend, str): 

170 storage_backend = StorageBackend(storage_backend.lower()) 

171 

172 self.storage_backend = storage_backend 

173 self.temp_dir = temp_dir or TEMP_DIR 

174 self.upload_dir = upload_dir or UPLOAD_DIR 

175 self.config = config or {} 

176 

177 # File tracking 

178 self._files: Dict[str, FileInfo] = {} 

179 

180 # Cloud storage clients (lazy initialized) 

181 self._s3_client = None 

182 self._gcs_client = None 

183 self._azure_client = None 

184 

185 # Ensure directories exist 

186 self._ensure_dirs() 

187 

188 def _ensure_dirs(self): 

189 """Ensure required directories exist (Docker-compatible).""" 

190 for dir_path in [self.temp_dir, self.upload_dir, TEMP_DIR, APP_TEMP_DIR]: 

191 try: 

192 Path(dir_path).mkdir(parents=True, exist_ok=True) 

193 except (PermissionError, OSError) as e: 

194 logger.warning(f"Could not create directory {dir_path}: {e}") 

195 

196 def _generate_file_id(self, content: bytes = None, filename: str = None) -> str: 

197 """Generate unique file ID.""" 

198 data = f"{time.time()}{filename or ''}" 

199 if content: 

200 data += hashlib.md5(content[:1024]).hexdigest() 

201 return hashlib.sha256(data.encode()).hexdigest()[:16] 

202 

203 def _get_mime_type(self, filename: str) -> str: 

204 """Get MIME type for filename.""" 

205 mime_type, _ = mimetypes.guess_type(filename) 

206 return mime_type or "application/octet-stream" 

207 

208 def _get_filename_from_url(self, url: str) -> str: 

209 """Extract filename from URL.""" 

210 parsed = urlparse(url) 

211 path = unquote(parsed.path) 

212 filename = os.path.basename(path) 

213 return filename or f"file_{int(time.time())}" 

214 

215 async def download( 

216 self, 

217 url: str, 

218 destination: Optional[str] = None, 

219 timeout: int = 60, 

220 max_size: Optional[int] = None 

221 ) -> str: 

222 """ 

223 Download file from URL. 

224 

225 Args: 

226 url: URL to download from 

227 destination: Destination path (auto-generated if not provided) 

228 timeout: Download timeout in seconds 

229 max_size: Maximum file size to download 

230 

231 Returns: 

232 Path to downloaded file 

233 """ 

234 start_time = time.time() 

235 

236 try: 

237 # Determine filename and destination 

238 filename = self._get_filename_from_url(url) 

239 if destination is None: 

240 destination = os.path.join(self.temp_dir, filename) 

241 

242 logger.info(f"Downloading {url} to {destination}") 

243 

244 # Ensure destination directory exists 

245 Path(destination).parent.mkdir(parents=True, exist_ok=True) 

246 

247 # Would use aiohttp or httpx for actual download 

248 # async with aiohttp.ClientSession() as session: 

249 # async with session.get(url, timeout=timeout) as response: 

250 # if response.status != 200: 

251 # raise Exception(f"Download failed: {response.status}") 

252 # 

253 # # Check content length 

254 # content_length = response.headers.get("content-length") 

255 # if content_length and max_size and int(content_length) > max_size: 

256 # raise Exception(f"File too large: {content_length} > {max_size}") 

257 # 

258 # with open(destination, 'wb') as f: 

259 # async for chunk in response.content.iter_chunked(8192): 

260 # f.write(chunk) 

261 

262 # Placeholder - simulated successful download 

263 Path(destination).touch() 

264 

265 # Track file 

266 file_id = self._generate_file_id(filename=filename) 

267 file_info = FileInfo( 

268 file_id=file_id, 

269 filename=filename, 

270 size=0, # Would be actual size 

271 mime_type=self._get_mime_type(filename), 

272 url=url, 

273 local_path=destination, 

274 metadata={"download_time": time.time() - start_time} 

275 ) 

276 self._files[file_id] = file_info 

277 

278 return destination 

279 

280 except Exception as e: 

281 logger.error(f"Download failed: {e}") 

282 raise 

283 

284 async def upload( 

285 self, 

286 file_path: str, 

287 channel: str, 

288 filename: Optional[str] = None 

289 ) -> str: 

290 """ 

291 Upload file to storage and return URL. 

292 

293 Args: 

294 file_path: Path to file to upload 

295 channel: Target channel (for size/type validation) 

296 filename: Override filename 

297 

298 Returns: 

299 URL to uploaded file 

300 """ 

301 start_time = time.time() 

302 

303 try: 

304 path = Path(file_path) 

305 if not path.exists(): 

306 raise FileNotFoundError(f"File not found: {file_path}") 

307 

308 filename = filename or path.name 

309 file_size = path.stat().st_size 

310 

311 # Validate file for channel 

312 self._validate_for_channel(filename, file_size, channel) 

313 

314 logger.info(f"Uploading {file_path} for {channel}") 

315 

316 # Read file content 

317 with open(path, 'rb') as f: 

318 content = f.read() 

319 

320 # Generate file ID and checksum 

321 file_id = self._generate_file_id(content, filename) 

322 checksum = hashlib.md5(content).hexdigest() 

323 

324 # Upload based on backend 

325 if self.storage_backend == StorageBackend.LOCAL: 

326 url = await self._upload_local(file_id, filename, content) 

327 elif self.storage_backend == StorageBackend.S3: 

328 url = await self._upload_s3(file_id, filename, content) 

329 elif self.storage_backend == StorageBackend.GCS: 

330 url = await self._upload_gcs(file_id, filename, content) 

331 elif self.storage_backend == StorageBackend.AZURE: 

332 url = await self._upload_azure(file_id, filename, content) 

333 else: 

334 url = await self._upload_local(file_id, filename, content) 

335 

336 # Track file 

337 file_info = FileInfo( 

338 file_id=file_id, 

339 filename=filename, 

340 size=file_size, 

341 mime_type=self._get_mime_type(filename), 

342 url=url, 

343 local_path=file_path, 

344 channel=channel, 

345 checksum=checksum, 

346 metadata={"upload_time": time.time() - start_time} 

347 ) 

348 self._files[file_id] = file_info 

349 

350 return url 

351 

352 except Exception as e: 

353 logger.error(f"Upload failed: {e}") 

354 raise 

355 

356 async def _upload_local( 

357 self, 

358 file_id: str, 

359 filename: str, 

360 content: bytes 

361 ) -> str: 

362 """Upload to local storage.""" 

363 dest_path = os.path.join(self.upload_dir, file_id, filename) 

364 Path(dest_path).parent.mkdir(parents=True, exist_ok=True) 

365 

366 with open(dest_path, 'wb') as f: 

367 f.write(content) 

368 

369 # Return local file URL (would be served by web server) 

370 return f"/files/{file_id}/{filename}" 

371 

372 async def _upload_s3( 

373 self, 

374 file_id: str, 

375 filename: str, 

376 content: bytes 

377 ) -> str: 

378 """Upload to Amazon S3.""" 

379 # Would use boto3 

380 # s3 = boto3.client('s3') 

381 # bucket = self.config.get('s3_bucket') 

382 # key = f"{file_id}/{filename}" 

383 # s3.put_object(Bucket=bucket, Key=key, Body=content) 

384 # return f"https://{bucket}.s3.amazonaws.com/{key}" 

385 return f"https://s3.example.com/{file_id}/{filename}" 

386 

387 async def _upload_gcs( 

388 self, 

389 file_id: str, 

390 filename: str, 

391 content: bytes 

392 ) -> str: 

393 """Upload to Google Cloud Storage.""" 

394 # Would use google-cloud-storage 

395 return f"https://storage.googleapis.com/bucket/{file_id}/{filename}" 

396 

397 async def _upload_azure( 

398 self, 

399 file_id: str, 

400 filename: str, 

401 content: bytes 

402 ) -> str: 

403 """Upload to Azure Blob Storage.""" 

404 # Would use azure-storage-blob 

405 return f"https://account.blob.core.windows.net/container/{file_id}/{filename}" 

406 

407 def _validate_for_channel( 

408 self, 

409 filename: str, 

410 file_size: int, 

411 channel: str 

412 ): 

413 """Validate file for channel restrictions.""" 

414 # Check size 

415 max_size = self.CHANNEL_MAX_SIZES.get( 

416 channel.lower(), 

417 self.CHANNEL_MAX_SIZES["default"] 

418 ) 

419 if file_size > max_size: 

420 raise ValueError( 

421 f"File too large for {channel}: {file_size} > {max_size}" 

422 ) 

423 

424 # Check extension 

425 ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else "" 

426 allowed = self.CHANNEL_ALLOWED_EXTENSIONS.get( 

427 channel.lower(), 

428 self.CHANNEL_ALLOWED_EXTENSIONS["default"] 

429 ) 

430 if ext and ext not in allowed: 

431 raise ValueError( 

432 f"File type '{ext}' not allowed for {channel}" 

433 ) 

434 

435 async def get_info( 

436 self, 

437 file_id: str, 

438 channel: Optional[str] = None 

439 ) -> FileInfo: 

440 """ 

441 Get information about a file. 

442 

443 Args: 

444 file_id: File identifier 

445 channel: Channel context (for channel-specific info) 

446 

447 Returns: 

448 FileInfo object 

449 """ 

450 if file_id in self._files: 

451 return self._files[file_id] 

452 

453 # Would query storage backend for file info 

454 raise FileNotFoundError(f"File not found: {file_id}") 

455 

456 async def get_info_from_path(self, file_path: str) -> FileInfo: 

457 """ 

458 Get file info from local path. 

459 

460 Args: 

461 file_path: Path to file 

462 

463 Returns: 

464 FileInfo object 

465 """ 

466 path = Path(file_path) 

467 if not path.exists(): 

468 raise FileNotFoundError(f"File not found: {file_path}") 

469 

470 stat = path.stat() 

471 

472 with open(path, 'rb') as f: 

473 content_start = f.read(1024) 

474 checksum = hashlib.md5(content_start).hexdigest() 

475 

476 return FileInfo( 

477 file_id=self._generate_file_id(content_start, path.name), 

478 filename=path.name, 

479 size=stat.st_size, 

480 mime_type=self._get_mime_type(path.name), 

481 local_path=str(path), 

482 created_at=stat.st_ctime, 

483 checksum=checksum 

484 ) 

485 

486 def cleanup_temp(self, max_age_hours: int = 24) -> int: 

487 """ 

488 Clean up temporary files older than max_age. 

489 

490 Args: 

491 max_age_hours: Maximum age in hours 

492 

493 Returns: 

494 Number of files deleted 

495 """ 

496 deleted = 0 

497 max_age_seconds = max_age_hours * 3600 

498 cutoff_time = time.time() - max_age_seconds 

499 

500 for dir_path in [self.temp_dir, TEMP_DIR]: 

501 try: 

502 path = Path(dir_path) 

503 if not path.exists(): 

504 continue 

505 

506 for file_path in path.rglob("*"): 

507 if file_path.is_file(): 

508 try: 

509 if file_path.stat().st_mtime < cutoff_time: 

510 file_path.unlink() 

511 deleted += 1 

512 logger.debug(f"Deleted temp file: {file_path}") 

513 except (PermissionError, OSError) as e: 

514 logger.warning(f"Could not delete {file_path}: {e}") 

515 

516 except Exception as e: 

517 logger.error(f"Error cleaning temp directory {dir_path}: {e}") 

518 

519 # Also clean tracked files 

520 expired_ids = [ 

521 fid for fid, info in self._files.items() 

522 if info.is_expired() or (info.created_at and info.created_at < cutoff_time) 

523 ] 

524 for file_id in expired_ids: 

525 del self._files[file_id] 

526 deleted += 1 

527 

528 logger.info(f"Cleaned up {deleted} temporary files") 

529 return deleted 

530 

531 async def delete(self, file_id: str) -> bool: 

532 """ 

533 Delete a file. 

534 

535 Args: 

536 file_id: File identifier 

537 

538 Returns: 

539 True if deleted successfully 

540 """ 

541 if file_id not in self._files: 

542 return False 

543 

544 file_info = self._files[file_id] 

545 

546 # Delete local file 

547 if file_info.local_path: 

548 try: 

549 Path(file_info.local_path).unlink(missing_ok=True) 

550 except Exception as e: 

551 logger.warning(f"Could not delete local file: {e}") 

552 

553 # Delete from storage backend 

554 if self.storage_backend == StorageBackend.S3: 

555 # Would delete from S3 

556 pass 

557 elif self.storage_backend == StorageBackend.GCS: 

558 # Would delete from GCS 

559 pass 

560 elif self.storage_backend == StorageBackend.AZURE: 

561 # Would delete from Azure 

562 pass 

563 

564 # Remove from tracking 

565 del self._files[file_id] 

566 return True 

567 

568 async def copy( 

569 self, 

570 source_path: str, 

571 dest_path: str 

572 ) -> str: 

573 """ 

574 Copy a file. 

575 

576 Args: 

577 source_path: Source file path 

578 dest_path: Destination file path 

579 

580 Returns: 

581 Destination path 

582 """ 

583 import shutil 

584 

585 source = Path(source_path) 

586 if not source.exists(): 

587 raise FileNotFoundError(f"Source not found: {source_path}") 

588 

589 dest = Path(dest_path) 

590 dest.parent.mkdir(parents=True, exist_ok=True) 

591 

592 shutil.copy2(source, dest) 

593 return str(dest) 

594 

595 async def move( 

596 self, 

597 source_path: str, 

598 dest_path: str 

599 ) -> str: 

600 """ 

601 Move a file. 

602 

603 Args: 

604 source_path: Source file path 

605 dest_path: Destination file path 

606 

607 Returns: 

608 Destination path 

609 """ 

610 import shutil 

611 

612 source = Path(source_path) 

613 if not source.exists(): 

614 raise FileNotFoundError(f"Source not found: {source_path}") 

615 

616 dest = Path(dest_path) 

617 dest.parent.mkdir(parents=True, exist_ok=True) 

618 

619 shutil.move(source, dest) 

620 return str(dest) 

621 

622 def get_temp_path(self, prefix: str = "file", extension: str = "") -> str: 

623 """ 

624 Get a temporary file path. 

625 

626 Args: 

627 prefix: File name prefix 

628 extension: File extension (without dot) 

629 

630 Returns: 

631 Temporary file path (Docker-compatible) 

632 """ 

633 timestamp = int(time.time() * 1000) 

634 random_hash = hashlib.md5(str(timestamp).encode()).hexdigest()[:8] 

635 filename = f"{prefix}_{timestamp}_{random_hash}" 

636 if extension: 

637 filename += f".{extension}" 

638 return os.path.join(self.temp_dir, filename) 

639 

640 def format_size(self, size: int) -> str: 

641 """Format size in human-readable format.""" 

642 for unit in ['B', 'KB', 'MB', 'GB']: 

643 if size < 1024: 

644 return f"{size:.1f} {unit}" 

645 size /= 1024 

646 return f"{size:.1f} TB" 

647 

648 def get_channel_limits(self, channel: str) -> Dict[str, Any]: 

649 """Get file limits for a channel.""" 

650 return { 

651 "max_size": self.CHANNEL_MAX_SIZES.get( 

652 channel.lower(), 

653 self.CHANNEL_MAX_SIZES["default"] 

654 ), 

655 "max_size_formatted": self.format_size( 

656 self.CHANNEL_MAX_SIZES.get( 

657 channel.lower(), 

658 self.CHANNEL_MAX_SIZES["default"] 

659 ) 

660 ), 

661 "allowed_extensions": self.CHANNEL_ALLOWED_EXTENSIONS.get( 

662 channel.lower(), 

663 self.CHANNEL_ALLOWED_EXTENSIONS["default"] 

664 ) 

665 } 

666 

667 def list_files( 

668 self, 

669 channel: Optional[str] = None, 

670 include_expired: bool = False 

671 ) -> List[FileInfo]: 

672 """ 

673 List tracked files. 

674 

675 Args: 

676 channel: Filter by channel 

677 include_expired: Include expired files 

678 

679 Returns: 

680 List of FileInfo objects 

681 """ 

682 files = list(self._files.values()) 

683 

684 if channel: 

685 files = [f for f in files if f.channel == channel] 

686 

687 if not include_expired: 

688 files = [f for f in files if not f.is_expired()] 

689 

690 return sorted(files, key=lambda f: f.created_at, reverse=True) 

691 

692 def get_storage_info(self) -> Dict[str, Any]: 

693 """Get information about storage configuration.""" 

694 return { 

695 "backend": self.storage_backend.value, 

696 "temp_dir": self.temp_dir, 

697 "upload_dir": self.upload_dir, 

698 "tracked_files": len(self._files), 

699 "supported_channels": list(self.CHANNEL_MAX_SIZES.keys()) 

700 }