Coverage for integrations / channels / memory / file_tracker.py: 77.4%
345 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2File Tracker - Monitor and index file changes.
4Provides file watching, change detection, and synchronization capabilities
5for the memory system. Designed for Docker environments with container-compatible paths.
6"""
8import asyncio
9import fnmatch
10import hashlib
11import json
12import os
13import sqlite3
14import threading
15import time
16from dataclasses import dataclass, field
17from datetime import datetime
18from enum import Enum
19from pathlib import Path
20from typing import Any, Callable, Dict, List, Optional, Set, Union
23class ChangeType(Enum):
24 """Types of file changes."""
25 CREATED = "created"
26 MODIFIED = "modified"
27 DELETED = "deleted"
28 RENAMED = "renamed"
31@dataclass
32class FileChange:
33 """Represents a single file change event."""
35 path: str
36 change_type: ChangeType
37 timestamp: datetime = field(default_factory=datetime.utcnow)
38 old_path: Optional[str] = None # For renames
39 size: int = 0
40 content_hash: str = ""
41 metadata: Dict[str, Any] = field(default_factory=dict)
43 def to_dict(self) -> Dict[str, Any]:
44 """Convert to dictionary representation."""
45 return {
46 "path": self.path,
47 "change_type": self.change_type.value,
48 "timestamp": self.timestamp.isoformat(),
49 "old_path": self.old_path,
50 "size": self.size,
51 "content_hash": self.content_hash,
52 "metadata": self.metadata,
53 }
55 @classmethod
56 def from_dict(cls, data: Dict[str, Any]) -> "FileChange":
57 """Create from dictionary representation."""
58 return cls(
59 path=data["path"],
60 change_type=ChangeType(data["change_type"]),
61 timestamp=datetime.fromisoformat(data["timestamp"]) if isinstance(data["timestamp"], str) else data["timestamp"],
62 old_path=data.get("old_path"),
63 size=data.get("size", 0),
64 content_hash=data.get("content_hash", ""),
65 metadata=data.get("metadata", {}),
66 )
69@dataclass
70class SyncResult:
71 """Result of a file synchronization operation."""
73 path: str
74 success: bool
75 files_added: int = 0
76 files_modified: int = 0
77 files_deleted: int = 0
78 files_unchanged: int = 0
79 errors: List[str] = field(default_factory=list)
80 duration_ms: float = 0.0
81 changes: List[FileChange] = field(default_factory=list)
83 @property
84 def total_changes(self) -> int:
85 """Total number of changes detected."""
86 return self.files_added + self.files_modified + self.files_deleted
88 def to_dict(self) -> Dict[str, Any]:
89 """Convert to dictionary representation."""
90 return {
91 "path": self.path,
92 "success": self.success,
93 "files_added": self.files_added,
94 "files_modified": self.files_modified,
95 "files_deleted": self.files_deleted,
96 "files_unchanged": self.files_unchanged,
97 "total_changes": self.total_changes,
98 "errors": self.errors,
99 "duration_ms": self.duration_ms,
100 "changes": [c.to_dict() for c in self.changes],
101 }
104@dataclass
105class WatchConfig:
106 """Configuration for file watching."""
108 patterns: List[str] = field(default_factory=lambda: ["*"])
109 ignore_patterns: List[str] = field(default_factory=lambda: [
110 "*.pyc", "__pycache__", ".git", ".svn", "*.swp", "*.tmp",
111 "node_modules", ".venv", "venv", "*.log"
112 ])
113 recursive: bool = True
114 include_hidden: bool = False
115 max_file_size: int = 10 * 1024 * 1024 # 10MB default
116 debounce_ms: int = 500
117 compute_hash: bool = True
120class FileWatcher:
121 """
122 Watches a directory for file changes.
124 Provides real-time monitoring with debouncing and pattern filtering.
125 Uses polling for Docker compatibility (inotify doesn't work across mounts).
126 """
128 def __init__(
129 self,
130 path: str,
131 config: Optional[WatchConfig] = None,
132 on_change: Optional[Callable[[FileChange], None]] = None,
133 ):
134 """
135 Initialize the file watcher.
137 Args:
138 path: Directory path to watch.
139 config: Watch configuration.
140 on_change: Callback for change events.
141 """
142 self.path = self._normalize_path(path)
143 self.config = config or WatchConfig()
144 self.on_change = on_change
146 self._running = False
147 self._thread: Optional[threading.Thread] = None
148 self._lock = threading.RLock()
149 self._file_states: Dict[str, Dict[str, Any]] = {}
150 self._pending_changes: List[FileChange] = []
151 self._last_debounce: float = 0
153 @staticmethod
154 def _normalize_path(path: str) -> str:
155 """Normalize path for container compatibility."""
156 # Convert Windows paths to Unix-style for Docker
157 normalized = path.replace("\\", "/")
158 # Handle common Docker mount points
159 if not normalized.startswith("/"):
160 # Relative path - make absolute
161 normalized = os.path.abspath(path).replace("\\", "/")
162 return normalized
164 def start(self) -> None:
165 """Start watching for file changes."""
166 with self._lock:
167 if self._running:
168 return
169 self._running = True
170 self._scan_initial()
171 self._thread = threading.Thread(target=self._poll_loop, daemon=True)
172 self._thread.start()
174 def stop(self) -> None:
175 """Stop watching for file changes."""
176 with self._lock:
177 self._running = False
178 if self._thread:
179 self._thread.join(timeout=2.0)
180 self._thread = None
182 def _scan_initial(self) -> None:
183 """Perform initial scan of the directory."""
184 self._file_states.clear()
185 for file_path in self._iter_files():
186 try:
187 stat = os.stat(file_path)
188 self._file_states[file_path] = {
189 "mtime": stat.st_mtime,
190 "size": stat.st_size,
191 "hash": self._compute_hash(file_path) if self.config.compute_hash else "",
192 }
193 except OSError:
194 pass
196 def _poll_loop(self) -> None:
197 """Main polling loop for detecting changes."""
198 poll_interval = max(0.1, self.config.debounce_ms / 1000.0)
200 while self._running:
201 try:
202 self._check_for_changes()
203 except Exception:
204 pass # Ignore polling errors
205 time.sleep(poll_interval)
207 def _check_for_changes(self) -> None:
208 """Check for file changes and emit events."""
209 current_files: Set[str] = set()
210 changes: List[FileChange] = []
212 for file_path in self._iter_files():
213 current_files.add(file_path)
214 try:
215 stat = os.stat(file_path)
216 current_state = {
217 "mtime": stat.st_mtime,
218 "size": stat.st_size,
219 }
221 if file_path not in self._file_states:
222 # New file
223 content_hash = self._compute_hash(file_path) if self.config.compute_hash else ""
224 current_state["hash"] = content_hash
225 self._file_states[file_path] = current_state
226 changes.append(FileChange(
227 path=file_path,
228 change_type=ChangeType.CREATED,
229 size=stat.st_size,
230 content_hash=content_hash,
231 ))
232 elif (current_state["mtime"] != self._file_states[file_path]["mtime"] or
233 current_state["size"] != self._file_states[file_path]["size"]):
234 # Modified file
235 content_hash = self._compute_hash(file_path) if self.config.compute_hash else ""
236 current_state["hash"] = content_hash
238 # Only report change if hash differs (handles save without change)
239 if not self.config.compute_hash or content_hash != self._file_states[file_path].get("hash", ""):
240 self._file_states[file_path] = current_state
241 changes.append(FileChange(
242 path=file_path,
243 change_type=ChangeType.MODIFIED,
244 size=stat.st_size,
245 content_hash=content_hash,
246 ))
247 else:
248 self._file_states[file_path] = current_state
250 except OSError:
251 pass
253 # Check for deleted files
254 deleted = set(self._file_states.keys()) - current_files
255 for file_path in deleted:
256 del self._file_states[file_path]
257 changes.append(FileChange(
258 path=file_path,
259 change_type=ChangeType.DELETED,
260 ))
262 # Emit changes
263 if changes and self.on_change:
264 for change in changes:
265 try:
266 self.on_change(change)
267 except Exception:
268 pass
270 def _iter_files(self):
271 """Iterate over files matching the watch patterns."""
272 base_path = Path(self.path)
273 if not base_path.exists():
274 return
276 if self.config.recursive:
277 walker = base_path.rglob("*")
278 else:
279 walker = base_path.glob("*")
281 for path in walker:
282 if not path.is_file():
283 continue
285 str_path = str(path)
287 # Check ignore patterns
288 if any(fnmatch.fnmatch(path.name, pat) or fnmatch.fnmatch(str_path, pat)
289 for pat in self.config.ignore_patterns):
290 continue
292 # Check hidden files
293 if not self.config.include_hidden and path.name.startswith("."):
294 continue
296 # Check include patterns
297 if self.config.patterns != ["*"]:
298 if not any(fnmatch.fnmatch(path.name, pat) for pat in self.config.patterns):
299 continue
301 # Check file size
302 try:
303 if path.stat().st_size > self.config.max_file_size:
304 continue
305 except OSError:
306 continue
308 yield str_path
310 def _compute_hash(self, file_path: str) -> str:
311 """Compute SHA256 hash of file content."""
312 try:
313 hasher = hashlib.sha256()
314 with open(file_path, "rb") as f:
315 for chunk in iter(lambda: f.read(8192), b""):
316 hasher.update(chunk)
317 return hasher.hexdigest()
318 except OSError:
319 return ""
321 @property
322 def is_running(self) -> bool:
323 """Check if watcher is running."""
324 return self._running
326 def get_watched_files(self) -> List[str]:
327 """Get list of currently watched files."""
328 with self._lock:
329 return list(self._file_states.keys())
332class FileTracker:
333 """
334 Monitor and index file changes.
336 Provides persistent tracking of file changes with SQLite storage,
337 designed for Docker environments with proper file locking.
339 Features:
340 - Watch multiple directories with configurable patterns
341 - Persistent change history with SQLite FTS5
342 - Async sync operations
343 - Change detection since arbitrary timestamps
344 """
346 SCHEMA_VERSION = 1
348 # Default paths for Docker environments
349 DEFAULT_DATA_DIR = "/app/data"
350 DEFAULT_TEMP_DIR = "/tmp/file_tracker"
352 def __init__(
353 self,
354 db_path: Optional[Union[str, Path]] = None,
355 data_dir: Optional[str] = None,
356 ):
357 """
358 Initialize the file tracker.
360 Args:
361 db_path: Path to SQLite database. Uses temp directory if None.
362 data_dir: Base data directory for relative paths.
363 """
364 # Determine data directory
365 if data_dir:
366 self.data_dir = data_dir
367 elif os.path.exists(self.DEFAULT_DATA_DIR):
368 self.data_dir = self.DEFAULT_DATA_DIR
369 else:
370 self.data_dir = os.path.abspath(".")
372 # Determine database path
373 if db_path:
374 self.db_path = str(db_path)
375 else:
376 db_dir = self.DEFAULT_TEMP_DIR if os.path.exists("/tmp") else os.path.join(self.data_dir, ".tracker")
377 os.makedirs(db_dir, exist_ok=True)
378 self.db_path = os.path.join(db_dir, "file_tracker.db")
380 self._lock = threading.RLock()
381 self._conn: Optional[sqlite3.Connection] = None
382 self._watchers: Dict[str, FileWatcher] = {}
383 self._change_callbacks: List[Callable[[FileChange], None]] = []
385 self._ensure_connection()
386 self._ensure_schema()
388 def _ensure_connection(self) -> sqlite3.Connection:
389 """Ensure database connection with proper locking."""
390 if self._conn is None:
391 # Create parent directory if needed
392 os.makedirs(os.path.dirname(self.db_path) or ".", exist_ok=True)
394 self._conn = sqlite3.connect(
395 self.db_path,
396 check_same_thread=False,
397 isolation_level=None, # Auto-commit mode
398 timeout=30.0, # Wait up to 30s for lock
399 )
400 self._conn.row_factory = sqlite3.Row
402 # Enable WAL mode for better concurrency
403 self._conn.execute("PRAGMA journal_mode=WAL")
404 self._conn.execute("PRAGMA busy_timeout=30000")
406 return self._conn
408 def _ensure_schema(self) -> None:
409 """Create database schema if not exists."""
410 conn = self._ensure_connection()
411 with self._lock:
412 # File index table
413 conn.execute("""
414 CREATE TABLE IF NOT EXISTS tracked_files (
415 id INTEGER PRIMARY KEY AUTOINCREMENT,
416 path TEXT UNIQUE NOT NULL,
417 watch_path TEXT NOT NULL,
418 size INTEGER DEFAULT 0,
419 content_hash TEXT,
420 first_seen REAL,
421 last_modified REAL,
422 last_synced REAL,
423 metadata TEXT DEFAULT '{}'
424 )
425 """)
427 # Change history table
428 conn.execute("""
429 CREATE TABLE IF NOT EXISTS file_changes (
430 id INTEGER PRIMARY KEY AUTOINCREMENT,
431 path TEXT NOT NULL,
432 change_type TEXT NOT NULL,
433 timestamp REAL NOT NULL,
434 old_path TEXT,
435 size INTEGER DEFAULT 0,
436 content_hash TEXT,
437 metadata TEXT DEFAULT '{}'
438 )
439 """)
441 # Watch configurations table
442 conn.execute("""
443 CREATE TABLE IF NOT EXISTS watch_configs (
444 path TEXT PRIMARY KEY,
445 config TEXT NOT NULL,
446 enabled INTEGER DEFAULT 1,
447 created_at REAL,
448 updated_at REAL
449 )
450 """)
452 # Create FTS5 table for content search (if available)
453 try:
454 conn.execute("""
455 CREATE VIRTUAL TABLE IF NOT EXISTS file_content_fts USING fts5(
456 path,
457 content,
458 content=tracked_files
459 )
460 """)
461 self._fts_available = True
462 except sqlite3.OperationalError:
463 self._fts_available = False
465 # Create indexes
466 conn.execute("CREATE INDEX IF NOT EXISTS idx_changes_path ON file_changes(path)")
467 conn.execute("CREATE INDEX IF NOT EXISTS idx_changes_timestamp ON file_changes(timestamp)")
468 conn.execute("CREATE INDEX IF NOT EXISTS idx_files_watch ON tracked_files(watch_path)")
469 conn.execute("CREATE INDEX IF NOT EXISTS idx_files_hash ON tracked_files(content_hash)")
471 # Schema version
472 conn.execute("""
473 CREATE TABLE IF NOT EXISTS schema_meta (
474 key TEXT PRIMARY KEY,
475 value TEXT
476 )
477 """)
478 conn.execute(
479 "INSERT OR REPLACE INTO schema_meta (key, value) VALUES (?, ?)",
480 ("version", str(self.SCHEMA_VERSION))
481 )
483 def watch(self, path: str, patterns: Optional[List[str]] = None) -> None:
484 """
485 Start watching a directory for changes.
487 Args:
488 path: Directory path to watch.
489 patterns: Optional list of glob patterns to match (e.g., ["*.py", "*.json"]).
490 """
491 normalized_path = FileWatcher._normalize_path(path)
493 with self._lock:
494 if normalized_path in self._watchers:
495 return # Already watching
497 config = WatchConfig(patterns=patterns or ["*"])
499 # Save config to database
500 conn = self._ensure_connection()
501 now = time.time()
502 conn.execute(
503 """
504 INSERT OR REPLACE INTO watch_configs (path, config, enabled, created_at, updated_at)
505 VALUES (?, ?, 1, COALESCE((SELECT created_at FROM watch_configs WHERE path = ?), ?), ?)
506 """,
507 (normalized_path, json.dumps(config.__dict__), normalized_path, now, now)
508 )
510 # Create and start watcher
511 watcher = FileWatcher(
512 path=normalized_path,
513 config=config,
514 on_change=self._on_file_change,
515 )
516 self._watchers[normalized_path] = watcher
517 watcher.start()
519 def unwatch(self, path: str) -> None:
520 """
521 Stop watching a directory.
523 Args:
524 path: Directory path to stop watching.
525 """
526 normalized_path = FileWatcher._normalize_path(path)
528 with self._lock:
529 if normalized_path in self._watchers:
530 self._watchers[normalized_path].stop()
531 del self._watchers[normalized_path]
533 # Mark as disabled in database
534 conn = self._ensure_connection()
535 conn.execute(
536 "UPDATE watch_configs SET enabled = 0, updated_at = ? WHERE path = ?",
537 (time.time(), normalized_path)
538 )
540 async def sync(self, path: str) -> SyncResult:
541 """
542 Synchronize file index with filesystem.
544 Performs a full scan and updates the database with current state.
546 Args:
547 path: Directory path to sync.
549 Returns:
550 SyncResult with statistics and changes.
551 """
552 start_time = time.time()
553 normalized_path = FileWatcher._normalize_path(path)
555 result = SyncResult(path=normalized_path, success=False)
557 try:
558 # Get current state from database
559 conn = self._ensure_connection()
560 with self._lock:
561 rows = conn.execute(
562 "SELECT path, content_hash, size FROM tracked_files WHERE watch_path = ?",
563 (normalized_path,)
564 ).fetchall()
566 db_files: Dict[str, Dict[str, Any]] = {
567 row["path"]: {"hash": row["content_hash"], "size": row["size"]}
568 for row in rows
569 }
571 # Scan filesystem
572 config = WatchConfig()
573 watcher = FileWatcher(path=normalized_path, config=config)
574 current_files: Set[str] = set()
576 for file_path in watcher._iter_files():
577 current_files.add(file_path)
579 try:
580 stat = os.stat(file_path)
581 content_hash = watcher._compute_hash(file_path)
583 if file_path not in db_files:
584 # New file
585 result.files_added += 1
586 change = FileChange(
587 path=file_path,
588 change_type=ChangeType.CREATED,
589 size=stat.st_size,
590 content_hash=content_hash,
591 )
592 result.changes.append(change)
593 self._record_change(change)
594 self._update_file_record(file_path, normalized_path, stat.st_size, content_hash)
596 elif (content_hash != db_files[file_path]["hash"] or
597 stat.st_size != db_files[file_path]["size"]):
598 # Modified file
599 result.files_modified += 1
600 change = FileChange(
601 path=file_path,
602 change_type=ChangeType.MODIFIED,
603 size=stat.st_size,
604 content_hash=content_hash,
605 )
606 result.changes.append(change)
607 self._record_change(change)
608 self._update_file_record(file_path, normalized_path, stat.st_size, content_hash)
609 else:
610 result.files_unchanged += 1
612 except OSError as e:
613 result.errors.append(f"Error reading {file_path}: {e}")
615 # Check for deleted files
616 deleted = set(db_files.keys()) - current_files
617 for file_path in deleted:
618 result.files_deleted += 1
619 change = FileChange(path=file_path, change_type=ChangeType.DELETED)
620 result.changes.append(change)
621 self._record_change(change)
622 self._delete_file_record(file_path)
624 result.success = True
626 except Exception as e:
627 result.errors.append(f"Sync failed: {e}")
629 result.duration_ms = (time.time() - start_time) * 1000
630 return result
632 def get_changes(self, since: datetime) -> List[FileChange]:
633 """
634 Get all file changes since a given timestamp.
636 Args:
637 since: Get changes after this datetime.
639 Returns:
640 List of FileChange objects.
641 """
642 timestamp = since.timestamp()
644 conn = self._ensure_connection()
645 with self._lock:
646 rows = conn.execute(
647 """
648 SELECT path, change_type, timestamp, old_path, size, content_hash, metadata
649 FROM file_changes
650 WHERE timestamp > ?
651 ORDER BY timestamp ASC
652 """,
653 (timestamp,)
654 ).fetchall()
656 return [
657 FileChange(
658 path=row["path"],
659 change_type=ChangeType(row["change_type"]),
660 timestamp=datetime.fromtimestamp(row["timestamp"]),
661 old_path=row["old_path"],
662 size=row["size"] or 0,
663 content_hash=row["content_hash"] or "",
664 metadata=json.loads(row["metadata"]) if row["metadata"] else {},
665 )
666 for row in rows
667 ]
669 def get_tracked_files(self, watch_path: Optional[str] = None) -> List[Dict[str, Any]]:
670 """
671 Get list of tracked files.
673 Args:
674 watch_path: Optional filter by watch path.
676 Returns:
677 List of file records.
678 """
679 conn = self._ensure_connection()
680 with self._lock:
681 if watch_path:
682 normalized = FileWatcher._normalize_path(watch_path)
683 rows = conn.execute(
684 "SELECT * FROM tracked_files WHERE watch_path = ?",
685 (normalized,)
686 ).fetchall()
687 else:
688 rows = conn.execute("SELECT * FROM tracked_files").fetchall()
690 return [dict(row) for row in rows]
692 def get_watched_paths(self) -> List[str]:
693 """Get list of currently watched paths."""
694 with self._lock:
695 return list(self._watchers.keys())
697 def add_change_callback(self, callback: Callable[[FileChange], None]) -> None:
698 """Add a callback for file change events."""
699 self._change_callbacks.append(callback)
701 def remove_change_callback(self, callback: Callable[[FileChange], None]) -> None:
702 """Remove a change callback."""
703 if callback in self._change_callbacks:
704 self._change_callbacks.remove(callback)
706 def _on_file_change(self, change: FileChange) -> None:
707 """Handle file change event from watcher."""
708 self._record_change(change)
710 # Update file record
711 if change.change_type == ChangeType.DELETED:
712 self._delete_file_record(change.path)
713 else:
714 # Find which watch path this belongs to
715 watch_path = None
716 for wp in self._watchers.keys():
717 if change.path.startswith(wp):
718 watch_path = wp
719 break
720 if watch_path:
721 self._update_file_record(change.path, watch_path, change.size, change.content_hash)
723 # Notify callbacks
724 for callback in self._change_callbacks:
725 try:
726 callback(change)
727 except Exception:
728 pass
730 def _record_change(self, change: FileChange) -> None:
731 """Record a change to the database."""
732 conn = self._ensure_connection()
733 with self._lock:
734 conn.execute(
735 """
736 INSERT INTO file_changes (path, change_type, timestamp, old_path, size, content_hash, metadata)
737 VALUES (?, ?, ?, ?, ?, ?, ?)
738 """,
739 (
740 change.path,
741 change.change_type.value,
742 change.timestamp.timestamp(),
743 change.old_path,
744 change.size,
745 change.content_hash,
746 json.dumps(change.metadata),
747 )
748 )
750 def _update_file_record(self, path: str, watch_path: str, size: int, content_hash: str) -> None:
751 """Update or insert a file record."""
752 conn = self._ensure_connection()
753 now = time.time()
754 with self._lock:
755 conn.execute(
756 """
757 INSERT OR REPLACE INTO tracked_files
758 (path, watch_path, size, content_hash, first_seen, last_modified, last_synced)
759 VALUES (?, ?, ?, ?,
760 COALESCE((SELECT first_seen FROM tracked_files WHERE path = ?), ?),
761 ?, ?)
762 """,
763 (path, watch_path, size, content_hash, path, now, now, now)
764 )
766 def _delete_file_record(self, path: str) -> None:
767 """Delete a file record."""
768 conn = self._ensure_connection()
769 with self._lock:
770 conn.execute("DELETE FROM tracked_files WHERE path = ?", (path,))
772 def cleanup_old_changes(self, days: int = 30) -> int:
773 """
774 Remove change records older than specified days.
776 Args:
777 days: Maximum age of records to keep.
779 Returns:
780 Number of records deleted.
781 """
782 cutoff = time.time() - (days * 24 * 3600)
783 conn = self._ensure_connection()
784 with self._lock:
785 cursor = conn.execute(
786 "DELETE FROM file_changes WHERE timestamp < ?",
787 (cutoff,)
788 )
789 return cursor.rowcount
791 def close(self) -> None:
792 """Stop all watchers and close database connection."""
793 with self._lock:
794 for watcher in self._watchers.values():
795 watcher.stop()
796 self._watchers.clear()
798 if self._conn:
799 self._conn.close()
800 self._conn = None
802 def __enter__(self):
803 return self
805 def __exit__(self, exc_type, exc_val, exc_tb):
806 self.close()
807 return False