Coverage for integrations / channels / media / files.py: 64.4%
253 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2File Manager for file handling operations.
4Provides download, upload, and file management functionality.
5"""
7import asyncio
8import os
9import hashlib
10import mimetypes
11import time
12from dataclasses import dataclass, field
13from enum import Enum
14from typing import Optional, List, Dict, Any, Union
15from pathlib import Path
16from urllib.parse import urlparse, unquote
17import logging
19logger = logging.getLogger(__name__)
21# Docker-compatible paths
22TEMP_DIR = os.environ.get("FILE_TEMP_DIR", "/tmp/files")
23APP_TEMP_DIR = os.environ.get("APP_TEMP_DIR", "/app/temp")
24UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/app/uploads")
27class FileStatus(Enum):
28 """File operation status."""
29 PENDING = "pending"
30 DOWNLOADING = "downloading"
31 UPLOADING = "uploading"
32 COMPLETED = "completed"
33 FAILED = "failed"
34 EXPIRED = "expired"
37class StorageBackend(Enum):
38 """Storage backend types."""
39 LOCAL = "local"
40 S3 = "s3"
41 GCS = "gcs" # Google Cloud Storage
42 AZURE = "azure"
45@dataclass
46class FileInfo:
47 """Information about a file."""
48 file_id: str
49 filename: str
50 size: int
51 mime_type: str
52 url: Optional[str] = None
53 local_path: Optional[str] = None
54 channel: Optional[str] = None
55 created_at: float = field(default_factory=time.time)
56 expires_at: Optional[float] = None
57 checksum: Optional[str] = None
58 metadata: Dict[str, Any] = field(default_factory=dict)
60 def to_dict(self) -> Dict[str, Any]:
61 return {
62 "file_id": self.file_id,
63 "filename": self.filename,
64 "size": self.size,
65 "mime_type": self.mime_type,
66 "url": self.url,
67 "local_path": self.local_path,
68 "channel": self.channel,
69 "created_at": self.created_at,
70 "expires_at": self.expires_at,
71 "checksum": self.checksum,
72 "metadata": self.metadata
73 }
75 def is_expired(self) -> bool:
76 """Check if file has expired."""
77 if self.expires_at is None:
78 return False
79 return time.time() > self.expires_at
81 def get_extension(self) -> str:
82 """Get file extension."""
83 if "." in self.filename:
84 return self.filename.rsplit(".", 1)[-1].lower()
85 return ""
88@dataclass
89class DownloadResult:
90 """Result of a download operation."""
91 success: bool
92 file_path: Optional[str] = None
93 file_info: Optional[FileInfo] = None
94 error: Optional[str] = None
95 download_time: float = 0.0
97 def to_dict(self) -> Dict[str, Any]:
98 return {
99 "success": self.success,
100 "file_path": self.file_path,
101 "file_info": self.file_info.to_dict() if self.file_info else None,
102 "error": self.error,
103 "download_time": self.download_time
104 }
107@dataclass
108class UploadResult:
109 """Result of an upload operation."""
110 success: bool
111 url: Optional[str] = None
112 file_id: Optional[str] = None
113 file_info: Optional[FileInfo] = None
114 error: Optional[str] = None
115 upload_time: float = 0.0
117 def to_dict(self) -> Dict[str, Any]:
118 return {
119 "success": self.success,
120 "url": self.url,
121 "file_id": self.file_id,
122 "file_info": self.file_info.to_dict() if self.file_info else None,
123 "error": self.error,
124 "upload_time": self.upload_time
125 }
128class FileManager:
129 """
130 File manager for handling file operations.
132 Provides download, upload, and file management across channels.
133 """
135 # Maximum file sizes per channel (bytes)
136 CHANNEL_MAX_SIZES = {
137 "telegram": 50 * 1024 * 1024, # 50MB
138 "discord": 8 * 1024 * 1024, # 8MB (without Nitro)
139 "slack": 1 * 1024 * 1024 * 1024, # 1GB
140 "whatsapp": 16 * 1024 * 1024, # 16MB
141 "default": 25 * 1024 * 1024 # 25MB
142 }
144 # Allowed file extensions per channel
145 CHANNEL_ALLOWED_EXTENSIONS = {
146 "telegram": ["jpg", "jpeg", "png", "gif", "webp", "mp4", "mp3", "pdf", "doc", "docx", "zip"],
147 "discord": ["jpg", "jpeg", "png", "gif", "webp", "mp4", "mp3", "wav", "pdf", "txt"],
148 "slack": ["jpg", "jpeg", "png", "gif", "pdf", "doc", "docx", "xls", "xlsx", "ppt", "pptx", "zip"],
149 "whatsapp": ["jpg", "jpeg", "png", "gif", "mp4", "mp3", "pdf", "doc", "docx"],
150 "default": ["jpg", "jpeg", "png", "gif", "pdf", "txt"]
151 }
153 def __init__(
154 self,
155 storage_backend: Union[StorageBackend, str] = StorageBackend.LOCAL,
156 temp_dir: Optional[str] = None,
157 upload_dir: Optional[str] = None,
158 config: Optional[Dict[str, Any]] = None
159 ):
160 """
161 Initialize file manager.
163 Args:
164 storage_backend: Storage backend to use
165 temp_dir: Temporary directory for downloads
166 upload_dir: Directory for uploads
167 config: Additional configuration options
168 """
169 if isinstance(storage_backend, str):
170 storage_backend = StorageBackend(storage_backend.lower())
172 self.storage_backend = storage_backend
173 self.temp_dir = temp_dir or TEMP_DIR
174 self.upload_dir = upload_dir or UPLOAD_DIR
175 self.config = config or {}
177 # File tracking
178 self._files: Dict[str, FileInfo] = {}
180 # Cloud storage clients (lazy initialized)
181 self._s3_client = None
182 self._gcs_client = None
183 self._azure_client = None
185 # Ensure directories exist
186 self._ensure_dirs()
188 def _ensure_dirs(self):
189 """Ensure required directories exist (Docker-compatible)."""
190 for dir_path in [self.temp_dir, self.upload_dir, TEMP_DIR, APP_TEMP_DIR]:
191 try:
192 Path(dir_path).mkdir(parents=True, exist_ok=True)
193 except (PermissionError, OSError) as e:
194 logger.warning(f"Could not create directory {dir_path}: {e}")
196 def _generate_file_id(self, content: bytes = None, filename: str = None) -> str:
197 """Generate unique file ID."""
198 data = f"{time.time()}{filename or ''}"
199 if content:
200 data += hashlib.md5(content[:1024]).hexdigest()
201 return hashlib.sha256(data.encode()).hexdigest()[:16]
203 def _get_mime_type(self, filename: str) -> str:
204 """Get MIME type for filename."""
205 mime_type, _ = mimetypes.guess_type(filename)
206 return mime_type or "application/octet-stream"
208 def _get_filename_from_url(self, url: str) -> str:
209 """Extract filename from URL."""
210 parsed = urlparse(url)
211 path = unquote(parsed.path)
212 filename = os.path.basename(path)
213 return filename or f"file_{int(time.time())}"
215 async def download(
216 self,
217 url: str,
218 destination: Optional[str] = None,
219 timeout: int = 60,
220 max_size: Optional[int] = None
221 ) -> str:
222 """
223 Download file from URL.
225 Args:
226 url: URL to download from
227 destination: Destination path (auto-generated if not provided)
228 timeout: Download timeout in seconds
229 max_size: Maximum file size to download
231 Returns:
232 Path to downloaded file
233 """
234 start_time = time.time()
236 try:
237 # Determine filename and destination
238 filename = self._get_filename_from_url(url)
239 if destination is None:
240 destination = os.path.join(self.temp_dir, filename)
242 logger.info(f"Downloading {url} to {destination}")
244 # Ensure destination directory exists
245 Path(destination).parent.mkdir(parents=True, exist_ok=True)
247 # Would use aiohttp or httpx for actual download
248 # async with aiohttp.ClientSession() as session:
249 # async with session.get(url, timeout=timeout) as response:
250 # if response.status != 200:
251 # raise Exception(f"Download failed: {response.status}")
252 #
253 # # Check content length
254 # content_length = response.headers.get("content-length")
255 # if content_length and max_size and int(content_length) > max_size:
256 # raise Exception(f"File too large: {content_length} > {max_size}")
257 #
258 # with open(destination, 'wb') as f:
259 # async for chunk in response.content.iter_chunked(8192):
260 # f.write(chunk)
262 # Placeholder - simulated successful download
263 Path(destination).touch()
265 # Track file
266 file_id = self._generate_file_id(filename=filename)
267 file_info = FileInfo(
268 file_id=file_id,
269 filename=filename,
270 size=0, # Would be actual size
271 mime_type=self._get_mime_type(filename),
272 url=url,
273 local_path=destination,
274 metadata={"download_time": time.time() - start_time}
275 )
276 self._files[file_id] = file_info
278 return destination
280 except Exception as e:
281 logger.error(f"Download failed: {e}")
282 raise
284 async def upload(
285 self,
286 file_path: str,
287 channel: str,
288 filename: Optional[str] = None
289 ) -> str:
290 """
291 Upload file to storage and return URL.
293 Args:
294 file_path: Path to file to upload
295 channel: Target channel (for size/type validation)
296 filename: Override filename
298 Returns:
299 URL to uploaded file
300 """
301 start_time = time.time()
303 try:
304 path = Path(file_path)
305 if not path.exists():
306 raise FileNotFoundError(f"File not found: {file_path}")
308 filename = filename or path.name
309 file_size = path.stat().st_size
311 # Validate file for channel
312 self._validate_for_channel(filename, file_size, channel)
314 logger.info(f"Uploading {file_path} for {channel}")
316 # Read file content
317 with open(path, 'rb') as f:
318 content = f.read()
320 # Generate file ID and checksum
321 file_id = self._generate_file_id(content, filename)
322 checksum = hashlib.md5(content).hexdigest()
324 # Upload based on backend
325 if self.storage_backend == StorageBackend.LOCAL:
326 url = await self._upload_local(file_id, filename, content)
327 elif self.storage_backend == StorageBackend.S3:
328 url = await self._upload_s3(file_id, filename, content)
329 elif self.storage_backend == StorageBackend.GCS:
330 url = await self._upload_gcs(file_id, filename, content)
331 elif self.storage_backend == StorageBackend.AZURE:
332 url = await self._upload_azure(file_id, filename, content)
333 else:
334 url = await self._upload_local(file_id, filename, content)
336 # Track file
337 file_info = FileInfo(
338 file_id=file_id,
339 filename=filename,
340 size=file_size,
341 mime_type=self._get_mime_type(filename),
342 url=url,
343 local_path=file_path,
344 channel=channel,
345 checksum=checksum,
346 metadata={"upload_time": time.time() - start_time}
347 )
348 self._files[file_id] = file_info
350 return url
352 except Exception as e:
353 logger.error(f"Upload failed: {e}")
354 raise
356 async def _upload_local(
357 self,
358 file_id: str,
359 filename: str,
360 content: bytes
361 ) -> str:
362 """Upload to local storage."""
363 dest_path = os.path.join(self.upload_dir, file_id, filename)
364 Path(dest_path).parent.mkdir(parents=True, exist_ok=True)
366 with open(dest_path, 'wb') as f:
367 f.write(content)
369 # Return local file URL (would be served by web server)
370 return f"/files/{file_id}/{filename}"
372 async def _upload_s3(
373 self,
374 file_id: str,
375 filename: str,
376 content: bytes
377 ) -> str:
378 """Upload to Amazon S3."""
379 # Would use boto3
380 # s3 = boto3.client('s3')
381 # bucket = self.config.get('s3_bucket')
382 # key = f"{file_id}/{filename}"
383 # s3.put_object(Bucket=bucket, Key=key, Body=content)
384 # return f"https://{bucket}.s3.amazonaws.com/{key}"
385 return f"https://s3.example.com/{file_id}/{filename}"
387 async def _upload_gcs(
388 self,
389 file_id: str,
390 filename: str,
391 content: bytes
392 ) -> str:
393 """Upload to Google Cloud Storage."""
394 # Would use google-cloud-storage
395 return f"https://storage.googleapis.com/bucket/{file_id}/{filename}"
397 async def _upload_azure(
398 self,
399 file_id: str,
400 filename: str,
401 content: bytes
402 ) -> str:
403 """Upload to Azure Blob Storage."""
404 # Would use azure-storage-blob
405 return f"https://account.blob.core.windows.net/container/{file_id}/{filename}"
407 def _validate_for_channel(
408 self,
409 filename: str,
410 file_size: int,
411 channel: str
412 ):
413 """Validate file for channel restrictions."""
414 # Check size
415 max_size = self.CHANNEL_MAX_SIZES.get(
416 channel.lower(),
417 self.CHANNEL_MAX_SIZES["default"]
418 )
419 if file_size > max_size:
420 raise ValueError(
421 f"File too large for {channel}: {file_size} > {max_size}"
422 )
424 # Check extension
425 ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
426 allowed = self.CHANNEL_ALLOWED_EXTENSIONS.get(
427 channel.lower(),
428 self.CHANNEL_ALLOWED_EXTENSIONS["default"]
429 )
430 if ext and ext not in allowed:
431 raise ValueError(
432 f"File type '{ext}' not allowed for {channel}"
433 )
435 async def get_info(
436 self,
437 file_id: str,
438 channel: Optional[str] = None
439 ) -> FileInfo:
440 """
441 Get information about a file.
443 Args:
444 file_id: File identifier
445 channel: Channel context (for channel-specific info)
447 Returns:
448 FileInfo object
449 """
450 if file_id in self._files:
451 return self._files[file_id]
453 # Would query storage backend for file info
454 raise FileNotFoundError(f"File not found: {file_id}")
456 async def get_info_from_path(self, file_path: str) -> FileInfo:
457 """
458 Get file info from local path.
460 Args:
461 file_path: Path to file
463 Returns:
464 FileInfo object
465 """
466 path = Path(file_path)
467 if not path.exists():
468 raise FileNotFoundError(f"File not found: {file_path}")
470 stat = path.stat()
472 with open(path, 'rb') as f:
473 content_start = f.read(1024)
474 checksum = hashlib.md5(content_start).hexdigest()
476 return FileInfo(
477 file_id=self._generate_file_id(content_start, path.name),
478 filename=path.name,
479 size=stat.st_size,
480 mime_type=self._get_mime_type(path.name),
481 local_path=str(path),
482 created_at=stat.st_ctime,
483 checksum=checksum
484 )
486 def cleanup_temp(self, max_age_hours: int = 24) -> int:
487 """
488 Clean up temporary files older than max_age.
490 Args:
491 max_age_hours: Maximum age in hours
493 Returns:
494 Number of files deleted
495 """
496 deleted = 0
497 max_age_seconds = max_age_hours * 3600
498 cutoff_time = time.time() - max_age_seconds
500 for dir_path in [self.temp_dir, TEMP_DIR]:
501 try:
502 path = Path(dir_path)
503 if not path.exists():
504 continue
506 for file_path in path.rglob("*"):
507 if file_path.is_file():
508 try:
509 if file_path.stat().st_mtime < cutoff_time:
510 file_path.unlink()
511 deleted += 1
512 logger.debug(f"Deleted temp file: {file_path}")
513 except (PermissionError, OSError) as e:
514 logger.warning(f"Could not delete {file_path}: {e}")
516 except Exception as e:
517 logger.error(f"Error cleaning temp directory {dir_path}: {e}")
519 # Also clean tracked files
520 expired_ids = [
521 fid for fid, info in self._files.items()
522 if info.is_expired() or (info.created_at and info.created_at < cutoff_time)
523 ]
524 for file_id in expired_ids:
525 del self._files[file_id]
526 deleted += 1
528 logger.info(f"Cleaned up {deleted} temporary files")
529 return deleted
531 async def delete(self, file_id: str) -> bool:
532 """
533 Delete a file.
535 Args:
536 file_id: File identifier
538 Returns:
539 True if deleted successfully
540 """
541 if file_id not in self._files:
542 return False
544 file_info = self._files[file_id]
546 # Delete local file
547 if file_info.local_path:
548 try:
549 Path(file_info.local_path).unlink(missing_ok=True)
550 except Exception as e:
551 logger.warning(f"Could not delete local file: {e}")
553 # Delete from storage backend
554 if self.storage_backend == StorageBackend.S3:
555 # Would delete from S3
556 pass
557 elif self.storage_backend == StorageBackend.GCS:
558 # Would delete from GCS
559 pass
560 elif self.storage_backend == StorageBackend.AZURE:
561 # Would delete from Azure
562 pass
564 # Remove from tracking
565 del self._files[file_id]
566 return True
568 async def copy(
569 self,
570 source_path: str,
571 dest_path: str
572 ) -> str:
573 """
574 Copy a file.
576 Args:
577 source_path: Source file path
578 dest_path: Destination file path
580 Returns:
581 Destination path
582 """
583 import shutil
585 source = Path(source_path)
586 if not source.exists():
587 raise FileNotFoundError(f"Source not found: {source_path}")
589 dest = Path(dest_path)
590 dest.parent.mkdir(parents=True, exist_ok=True)
592 shutil.copy2(source, dest)
593 return str(dest)
595 async def move(
596 self,
597 source_path: str,
598 dest_path: str
599 ) -> str:
600 """
601 Move a file.
603 Args:
604 source_path: Source file path
605 dest_path: Destination file path
607 Returns:
608 Destination path
609 """
610 import shutil
612 source = Path(source_path)
613 if not source.exists():
614 raise FileNotFoundError(f"Source not found: {source_path}")
616 dest = Path(dest_path)
617 dest.parent.mkdir(parents=True, exist_ok=True)
619 shutil.move(source, dest)
620 return str(dest)
622 def get_temp_path(self, prefix: str = "file", extension: str = "") -> str:
623 """
624 Get a temporary file path.
626 Args:
627 prefix: File name prefix
628 extension: File extension (without dot)
630 Returns:
631 Temporary file path (Docker-compatible)
632 """
633 timestamp = int(time.time() * 1000)
634 random_hash = hashlib.md5(str(timestamp).encode()).hexdigest()[:8]
635 filename = f"{prefix}_{timestamp}_{random_hash}"
636 if extension:
637 filename += f".{extension}"
638 return os.path.join(self.temp_dir, filename)
640 def format_size(self, size: int) -> str:
641 """Format size in human-readable format."""
642 for unit in ['B', 'KB', 'MB', 'GB']:
643 if size < 1024:
644 return f"{size:.1f} {unit}"
645 size /= 1024
646 return f"{size:.1f} TB"
648 def get_channel_limits(self, channel: str) -> Dict[str, Any]:
649 """Get file limits for a channel."""
650 return {
651 "max_size": self.CHANNEL_MAX_SIZES.get(
652 channel.lower(),
653 self.CHANNEL_MAX_SIZES["default"]
654 ),
655 "max_size_formatted": self.format_size(
656 self.CHANNEL_MAX_SIZES.get(
657 channel.lower(),
658 self.CHANNEL_MAX_SIZES["default"]
659 )
660 ),
661 "allowed_extensions": self.CHANNEL_ALLOWED_EXTENSIONS.get(
662 channel.lower(),
663 self.CHANNEL_ALLOWED_EXTENSIONS["default"]
664 )
665 }
667 def list_files(
668 self,
669 channel: Optional[str] = None,
670 include_expired: bool = False
671 ) -> List[FileInfo]:
672 """
673 List tracked files.
675 Args:
676 channel: Filter by channel
677 include_expired: Include expired files
679 Returns:
680 List of FileInfo objects
681 """
682 files = list(self._files.values())
684 if channel:
685 files = [f for f in files if f.channel == channel]
687 if not include_expired:
688 files = [f for f in files if not f.is_expired()]
690 return sorted(files, key=lambda f: f.created_at, reverse=True)
692 def get_storage_info(self) -> Dict[str, Any]:
693 """Get information about storage configuration."""
694 return {
695 "backend": self.storage_backend.value,
696 "temp_dir": self.temp_dir,
697 "upload_dir": self.upload_dir,
698 "tracked_files": len(self._files),
699 "supported_channels": list(self.CHANNEL_MAX_SIZES.keys())
700 }