Coverage for integrations / channels / memory / file_tracker.py: 77.4%

345 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2File Tracker - Monitor and index file changes. 

3 

4Provides file watching, change detection, and synchronization capabilities 

5for the memory system. Designed for Docker environments with container-compatible paths. 

6""" 

7 

8import asyncio 

9import fnmatch 

10import hashlib 

11import json 

12import os 

13import sqlite3 

14import threading 

15import time 

16from dataclasses import dataclass, field 

17from datetime import datetime 

18from enum import Enum 

19from pathlib import Path 

20from typing import Any, Callable, Dict, List, Optional, Set, Union 

21 

22 

23class ChangeType(Enum): 

24 """Types of file changes.""" 

25 CREATED = "created" 

26 MODIFIED = "modified" 

27 DELETED = "deleted" 

28 RENAMED = "renamed" 

29 

30 

31@dataclass 

32class FileChange: 

33 """Represents a single file change event.""" 

34 

35 path: str 

36 change_type: ChangeType 

37 timestamp: datetime = field(default_factory=datetime.utcnow) 

38 old_path: Optional[str] = None # For renames 

39 size: int = 0 

40 content_hash: str = "" 

41 metadata: Dict[str, Any] = field(default_factory=dict) 

42 

43 def to_dict(self) -> Dict[str, Any]: 

44 """Convert to dictionary representation.""" 

45 return { 

46 "path": self.path, 

47 "change_type": self.change_type.value, 

48 "timestamp": self.timestamp.isoformat(), 

49 "old_path": self.old_path, 

50 "size": self.size, 

51 "content_hash": self.content_hash, 

52 "metadata": self.metadata, 

53 } 

54 

55 @classmethod 

56 def from_dict(cls, data: Dict[str, Any]) -> "FileChange": 

57 """Create from dictionary representation.""" 

58 return cls( 

59 path=data["path"], 

60 change_type=ChangeType(data["change_type"]), 

61 timestamp=datetime.fromisoformat(data["timestamp"]) if isinstance(data["timestamp"], str) else data["timestamp"], 

62 old_path=data.get("old_path"), 

63 size=data.get("size", 0), 

64 content_hash=data.get("content_hash", ""), 

65 metadata=data.get("metadata", {}), 

66 ) 

67 

68 

69@dataclass 

70class SyncResult: 

71 """Result of a file synchronization operation.""" 

72 

73 path: str 

74 success: bool 

75 files_added: int = 0 

76 files_modified: int = 0 

77 files_deleted: int = 0 

78 files_unchanged: int = 0 

79 errors: List[str] = field(default_factory=list) 

80 duration_ms: float = 0.0 

81 changes: List[FileChange] = field(default_factory=list) 

82 

83 @property 

84 def total_changes(self) -> int: 

85 """Total number of changes detected.""" 

86 return self.files_added + self.files_modified + self.files_deleted 

87 

88 def to_dict(self) -> Dict[str, Any]: 

89 """Convert to dictionary representation.""" 

90 return { 

91 "path": self.path, 

92 "success": self.success, 

93 "files_added": self.files_added, 

94 "files_modified": self.files_modified, 

95 "files_deleted": self.files_deleted, 

96 "files_unchanged": self.files_unchanged, 

97 "total_changes": self.total_changes, 

98 "errors": self.errors, 

99 "duration_ms": self.duration_ms, 

100 "changes": [c.to_dict() for c in self.changes], 

101 } 

102 

103 

104@dataclass 

105class WatchConfig: 

106 """Configuration for file watching.""" 

107 

108 patterns: List[str] = field(default_factory=lambda: ["*"]) 

109 ignore_patterns: List[str] = field(default_factory=lambda: [ 

110 "*.pyc", "__pycache__", ".git", ".svn", "*.swp", "*.tmp", 

111 "node_modules", ".venv", "venv", "*.log" 

112 ]) 

113 recursive: bool = True 

114 include_hidden: bool = False 

115 max_file_size: int = 10 * 1024 * 1024 # 10MB default 

116 debounce_ms: int = 500 

117 compute_hash: bool = True 

118 

119 

120class FileWatcher: 

121 """ 

122 Watches a directory for file changes. 

123 

124 Provides real-time monitoring with debouncing and pattern filtering. 

125 Uses polling for Docker compatibility (inotify doesn't work across mounts). 

126 """ 

127 

128 def __init__( 

129 self, 

130 path: str, 

131 config: Optional[WatchConfig] = None, 

132 on_change: Optional[Callable[[FileChange], None]] = None, 

133 ): 

134 """ 

135 Initialize the file watcher. 

136 

137 Args: 

138 path: Directory path to watch. 

139 config: Watch configuration. 

140 on_change: Callback for change events. 

141 """ 

142 self.path = self._normalize_path(path) 

143 self.config = config or WatchConfig() 

144 self.on_change = on_change 

145 

146 self._running = False 

147 self._thread: Optional[threading.Thread] = None 

148 self._lock = threading.RLock() 

149 self._file_states: Dict[str, Dict[str, Any]] = {} 

150 self._pending_changes: List[FileChange] = [] 

151 self._last_debounce: float = 0 

152 

153 @staticmethod 

154 def _normalize_path(path: str) -> str: 

155 """Normalize path for container compatibility.""" 

156 # Convert Windows paths to Unix-style for Docker 

157 normalized = path.replace("\\", "/") 

158 # Handle common Docker mount points 

159 if not normalized.startswith("/"): 

160 # Relative path - make absolute 

161 normalized = os.path.abspath(path).replace("\\", "/") 

162 return normalized 

163 

164 def start(self) -> None: 

165 """Start watching for file changes.""" 

166 with self._lock: 

167 if self._running: 

168 return 

169 self._running = True 

170 self._scan_initial() 

171 self._thread = threading.Thread(target=self._poll_loop, daemon=True) 

172 self._thread.start() 

173 

174 def stop(self) -> None: 

175 """Stop watching for file changes.""" 

176 with self._lock: 

177 self._running = False 

178 if self._thread: 

179 self._thread.join(timeout=2.0) 

180 self._thread = None 

181 

182 def _scan_initial(self) -> None: 

183 """Perform initial scan of the directory.""" 

184 self._file_states.clear() 

185 for file_path in self._iter_files(): 

186 try: 

187 stat = os.stat(file_path) 

188 self._file_states[file_path] = { 

189 "mtime": stat.st_mtime, 

190 "size": stat.st_size, 

191 "hash": self._compute_hash(file_path) if self.config.compute_hash else "", 

192 } 

193 except OSError: 

194 pass 

195 

196 def _poll_loop(self) -> None: 

197 """Main polling loop for detecting changes.""" 

198 poll_interval = max(0.1, self.config.debounce_ms / 1000.0) 

199 

200 while self._running: 

201 try: 

202 self._check_for_changes() 

203 except Exception: 

204 pass # Ignore polling errors 

205 time.sleep(poll_interval) 

206 

207 def _check_for_changes(self) -> None: 

208 """Check for file changes and emit events.""" 

209 current_files: Set[str] = set() 

210 changes: List[FileChange] = [] 

211 

212 for file_path in self._iter_files(): 

213 current_files.add(file_path) 

214 try: 

215 stat = os.stat(file_path) 

216 current_state = { 

217 "mtime": stat.st_mtime, 

218 "size": stat.st_size, 

219 } 

220 

221 if file_path not in self._file_states: 

222 # New file 

223 content_hash = self._compute_hash(file_path) if self.config.compute_hash else "" 

224 current_state["hash"] = content_hash 

225 self._file_states[file_path] = current_state 

226 changes.append(FileChange( 

227 path=file_path, 

228 change_type=ChangeType.CREATED, 

229 size=stat.st_size, 

230 content_hash=content_hash, 

231 )) 

232 elif (current_state["mtime"] != self._file_states[file_path]["mtime"] or 

233 current_state["size"] != self._file_states[file_path]["size"]): 

234 # Modified file 

235 content_hash = self._compute_hash(file_path) if self.config.compute_hash else "" 

236 current_state["hash"] = content_hash 

237 

238 # Only report change if hash differs (handles save without change) 

239 if not self.config.compute_hash or content_hash != self._file_states[file_path].get("hash", ""): 

240 self._file_states[file_path] = current_state 

241 changes.append(FileChange( 

242 path=file_path, 

243 change_type=ChangeType.MODIFIED, 

244 size=stat.st_size, 

245 content_hash=content_hash, 

246 )) 

247 else: 

248 self._file_states[file_path] = current_state 

249 

250 except OSError: 

251 pass 

252 

253 # Check for deleted files 

254 deleted = set(self._file_states.keys()) - current_files 

255 for file_path in deleted: 

256 del self._file_states[file_path] 

257 changes.append(FileChange( 

258 path=file_path, 

259 change_type=ChangeType.DELETED, 

260 )) 

261 

262 # Emit changes 

263 if changes and self.on_change: 

264 for change in changes: 

265 try: 

266 self.on_change(change) 

267 except Exception: 

268 pass 

269 

270 def _iter_files(self): 

271 """Iterate over files matching the watch patterns.""" 

272 base_path = Path(self.path) 

273 if not base_path.exists(): 

274 return 

275 

276 if self.config.recursive: 

277 walker = base_path.rglob("*") 

278 else: 

279 walker = base_path.glob("*") 

280 

281 for path in walker: 

282 if not path.is_file(): 

283 continue 

284 

285 str_path = str(path) 

286 

287 # Check ignore patterns 

288 if any(fnmatch.fnmatch(path.name, pat) or fnmatch.fnmatch(str_path, pat) 

289 for pat in self.config.ignore_patterns): 

290 continue 

291 

292 # Check hidden files 

293 if not self.config.include_hidden and path.name.startswith("."): 

294 continue 

295 

296 # Check include patterns 

297 if self.config.patterns != ["*"]: 

298 if not any(fnmatch.fnmatch(path.name, pat) for pat in self.config.patterns): 

299 continue 

300 

301 # Check file size 

302 try: 

303 if path.stat().st_size > self.config.max_file_size: 

304 continue 

305 except OSError: 

306 continue 

307 

308 yield str_path 

309 

310 def _compute_hash(self, file_path: str) -> str: 

311 """Compute SHA256 hash of file content.""" 

312 try: 

313 hasher = hashlib.sha256() 

314 with open(file_path, "rb") as f: 

315 for chunk in iter(lambda: f.read(8192), b""): 

316 hasher.update(chunk) 

317 return hasher.hexdigest() 

318 except OSError: 

319 return "" 

320 

321 @property 

322 def is_running(self) -> bool: 

323 """Check if watcher is running.""" 

324 return self._running 

325 

326 def get_watched_files(self) -> List[str]: 

327 """Get list of currently watched files.""" 

328 with self._lock: 

329 return list(self._file_states.keys()) 

330 

331 

332class FileTracker: 

333 """ 

334 Monitor and index file changes. 

335 

336 Provides persistent tracking of file changes with SQLite storage, 

337 designed for Docker environments with proper file locking. 

338 

339 Features: 

340 - Watch multiple directories with configurable patterns 

341 - Persistent change history with SQLite FTS5 

342 - Async sync operations 

343 - Change detection since arbitrary timestamps 

344 """ 

345 

346 SCHEMA_VERSION = 1 

347 

348 # Default paths for Docker environments 

349 DEFAULT_DATA_DIR = "/app/data" 

350 DEFAULT_TEMP_DIR = "/tmp/file_tracker" 

351 

352 def __init__( 

353 self, 

354 db_path: Optional[Union[str, Path]] = None, 

355 data_dir: Optional[str] = None, 

356 ): 

357 """ 

358 Initialize the file tracker. 

359 

360 Args: 

361 db_path: Path to SQLite database. Uses temp directory if None. 

362 data_dir: Base data directory for relative paths. 

363 """ 

364 # Determine data directory 

365 if data_dir: 

366 self.data_dir = data_dir 

367 elif os.path.exists(self.DEFAULT_DATA_DIR): 

368 self.data_dir = self.DEFAULT_DATA_DIR 

369 else: 

370 self.data_dir = os.path.abspath(".") 

371 

372 # Determine database path 

373 if db_path: 

374 self.db_path = str(db_path) 

375 else: 

376 db_dir = self.DEFAULT_TEMP_DIR if os.path.exists("/tmp") else os.path.join(self.data_dir, ".tracker") 

377 os.makedirs(db_dir, exist_ok=True) 

378 self.db_path = os.path.join(db_dir, "file_tracker.db") 

379 

380 self._lock = threading.RLock() 

381 self._conn: Optional[sqlite3.Connection] = None 

382 self._watchers: Dict[str, FileWatcher] = {} 

383 self._change_callbacks: List[Callable[[FileChange], None]] = [] 

384 

385 self._ensure_connection() 

386 self._ensure_schema() 

387 

388 def _ensure_connection(self) -> sqlite3.Connection: 

389 """Ensure database connection with proper locking.""" 

390 if self._conn is None: 

391 # Create parent directory if needed 

392 os.makedirs(os.path.dirname(self.db_path) or ".", exist_ok=True) 

393 

394 self._conn = sqlite3.connect( 

395 self.db_path, 

396 check_same_thread=False, 

397 isolation_level=None, # Auto-commit mode 

398 timeout=30.0, # Wait up to 30s for lock 

399 ) 

400 self._conn.row_factory = sqlite3.Row 

401 

402 # Enable WAL mode for better concurrency 

403 self._conn.execute("PRAGMA journal_mode=WAL") 

404 self._conn.execute("PRAGMA busy_timeout=30000") 

405 

406 return self._conn 

407 

408 def _ensure_schema(self) -> None: 

409 """Create database schema if not exists.""" 

410 conn = self._ensure_connection() 

411 with self._lock: 

412 # File index table 

413 conn.execute(""" 

414 CREATE TABLE IF NOT EXISTS tracked_files ( 

415 id INTEGER PRIMARY KEY AUTOINCREMENT, 

416 path TEXT UNIQUE NOT NULL, 

417 watch_path TEXT NOT NULL, 

418 size INTEGER DEFAULT 0, 

419 content_hash TEXT, 

420 first_seen REAL, 

421 last_modified REAL, 

422 last_synced REAL, 

423 metadata TEXT DEFAULT '{}' 

424 ) 

425 """) 

426 

427 # Change history table 

428 conn.execute(""" 

429 CREATE TABLE IF NOT EXISTS file_changes ( 

430 id INTEGER PRIMARY KEY AUTOINCREMENT, 

431 path TEXT NOT NULL, 

432 change_type TEXT NOT NULL, 

433 timestamp REAL NOT NULL, 

434 old_path TEXT, 

435 size INTEGER DEFAULT 0, 

436 content_hash TEXT, 

437 metadata TEXT DEFAULT '{}' 

438 ) 

439 """) 

440 

441 # Watch configurations table 

442 conn.execute(""" 

443 CREATE TABLE IF NOT EXISTS watch_configs ( 

444 path TEXT PRIMARY KEY, 

445 config TEXT NOT NULL, 

446 enabled INTEGER DEFAULT 1, 

447 created_at REAL, 

448 updated_at REAL 

449 ) 

450 """) 

451 

452 # Create FTS5 table for content search (if available) 

453 try: 

454 conn.execute(""" 

455 CREATE VIRTUAL TABLE IF NOT EXISTS file_content_fts USING fts5( 

456 path, 

457 content, 

458 content=tracked_files 

459 ) 

460 """) 

461 self._fts_available = True 

462 except sqlite3.OperationalError: 

463 self._fts_available = False 

464 

465 # Create indexes 

466 conn.execute("CREATE INDEX IF NOT EXISTS idx_changes_path ON file_changes(path)") 

467 conn.execute("CREATE INDEX IF NOT EXISTS idx_changes_timestamp ON file_changes(timestamp)") 

468 conn.execute("CREATE INDEX IF NOT EXISTS idx_files_watch ON tracked_files(watch_path)") 

469 conn.execute("CREATE INDEX IF NOT EXISTS idx_files_hash ON tracked_files(content_hash)") 

470 

471 # Schema version 

472 conn.execute(""" 

473 CREATE TABLE IF NOT EXISTS schema_meta ( 

474 key TEXT PRIMARY KEY, 

475 value TEXT 

476 ) 

477 """) 

478 conn.execute( 

479 "INSERT OR REPLACE INTO schema_meta (key, value) VALUES (?, ?)", 

480 ("version", str(self.SCHEMA_VERSION)) 

481 ) 

482 

483 def watch(self, path: str, patterns: Optional[List[str]] = None) -> None: 

484 """ 

485 Start watching a directory for changes. 

486 

487 Args: 

488 path: Directory path to watch. 

489 patterns: Optional list of glob patterns to match (e.g., ["*.py", "*.json"]). 

490 """ 

491 normalized_path = FileWatcher._normalize_path(path) 

492 

493 with self._lock: 

494 if normalized_path in self._watchers: 

495 return # Already watching 

496 

497 config = WatchConfig(patterns=patterns or ["*"]) 

498 

499 # Save config to database 

500 conn = self._ensure_connection() 

501 now = time.time() 

502 conn.execute( 

503 """ 

504 INSERT OR REPLACE INTO watch_configs (path, config, enabled, created_at, updated_at) 

505 VALUES (?, ?, 1, COALESCE((SELECT created_at FROM watch_configs WHERE path = ?), ?), ?) 

506 """, 

507 (normalized_path, json.dumps(config.__dict__), normalized_path, now, now) 

508 ) 

509 

510 # Create and start watcher 

511 watcher = FileWatcher( 

512 path=normalized_path, 

513 config=config, 

514 on_change=self._on_file_change, 

515 ) 

516 self._watchers[normalized_path] = watcher 

517 watcher.start() 

518 

519 def unwatch(self, path: str) -> None: 

520 """ 

521 Stop watching a directory. 

522 

523 Args: 

524 path: Directory path to stop watching. 

525 """ 

526 normalized_path = FileWatcher._normalize_path(path) 

527 

528 with self._lock: 

529 if normalized_path in self._watchers: 

530 self._watchers[normalized_path].stop() 

531 del self._watchers[normalized_path] 

532 

533 # Mark as disabled in database 

534 conn = self._ensure_connection() 

535 conn.execute( 

536 "UPDATE watch_configs SET enabled = 0, updated_at = ? WHERE path = ?", 

537 (time.time(), normalized_path) 

538 ) 

539 

540 async def sync(self, path: str) -> SyncResult: 

541 """ 

542 Synchronize file index with filesystem. 

543 

544 Performs a full scan and updates the database with current state. 

545 

546 Args: 

547 path: Directory path to sync. 

548 

549 Returns: 

550 SyncResult with statistics and changes. 

551 """ 

552 start_time = time.time() 

553 normalized_path = FileWatcher._normalize_path(path) 

554 

555 result = SyncResult(path=normalized_path, success=False) 

556 

557 try: 

558 # Get current state from database 

559 conn = self._ensure_connection() 

560 with self._lock: 

561 rows = conn.execute( 

562 "SELECT path, content_hash, size FROM tracked_files WHERE watch_path = ?", 

563 (normalized_path,) 

564 ).fetchall() 

565 

566 db_files: Dict[str, Dict[str, Any]] = { 

567 row["path"]: {"hash": row["content_hash"], "size": row["size"]} 

568 for row in rows 

569 } 

570 

571 # Scan filesystem 

572 config = WatchConfig() 

573 watcher = FileWatcher(path=normalized_path, config=config) 

574 current_files: Set[str] = set() 

575 

576 for file_path in watcher._iter_files(): 

577 current_files.add(file_path) 

578 

579 try: 

580 stat = os.stat(file_path) 

581 content_hash = watcher._compute_hash(file_path) 

582 

583 if file_path not in db_files: 

584 # New file 

585 result.files_added += 1 

586 change = FileChange( 

587 path=file_path, 

588 change_type=ChangeType.CREATED, 

589 size=stat.st_size, 

590 content_hash=content_hash, 

591 ) 

592 result.changes.append(change) 

593 self._record_change(change) 

594 self._update_file_record(file_path, normalized_path, stat.st_size, content_hash) 

595 

596 elif (content_hash != db_files[file_path]["hash"] or 

597 stat.st_size != db_files[file_path]["size"]): 

598 # Modified file 

599 result.files_modified += 1 

600 change = FileChange( 

601 path=file_path, 

602 change_type=ChangeType.MODIFIED, 

603 size=stat.st_size, 

604 content_hash=content_hash, 

605 ) 

606 result.changes.append(change) 

607 self._record_change(change) 

608 self._update_file_record(file_path, normalized_path, stat.st_size, content_hash) 

609 else: 

610 result.files_unchanged += 1 

611 

612 except OSError as e: 

613 result.errors.append(f"Error reading {file_path}: {e}") 

614 

615 # Check for deleted files 

616 deleted = set(db_files.keys()) - current_files 

617 for file_path in deleted: 

618 result.files_deleted += 1 

619 change = FileChange(path=file_path, change_type=ChangeType.DELETED) 

620 result.changes.append(change) 

621 self._record_change(change) 

622 self._delete_file_record(file_path) 

623 

624 result.success = True 

625 

626 except Exception as e: 

627 result.errors.append(f"Sync failed: {e}") 

628 

629 result.duration_ms = (time.time() - start_time) * 1000 

630 return result 

631 

632 def get_changes(self, since: datetime) -> List[FileChange]: 

633 """ 

634 Get all file changes since a given timestamp. 

635 

636 Args: 

637 since: Get changes after this datetime. 

638 

639 Returns: 

640 List of FileChange objects. 

641 """ 

642 timestamp = since.timestamp() 

643 

644 conn = self._ensure_connection() 

645 with self._lock: 

646 rows = conn.execute( 

647 """ 

648 SELECT path, change_type, timestamp, old_path, size, content_hash, metadata 

649 FROM file_changes 

650 WHERE timestamp > ? 

651 ORDER BY timestamp ASC 

652 """, 

653 (timestamp,) 

654 ).fetchall() 

655 

656 return [ 

657 FileChange( 

658 path=row["path"], 

659 change_type=ChangeType(row["change_type"]), 

660 timestamp=datetime.fromtimestamp(row["timestamp"]), 

661 old_path=row["old_path"], 

662 size=row["size"] or 0, 

663 content_hash=row["content_hash"] or "", 

664 metadata=json.loads(row["metadata"]) if row["metadata"] else {}, 

665 ) 

666 for row in rows 

667 ] 

668 

669 def get_tracked_files(self, watch_path: Optional[str] = None) -> List[Dict[str, Any]]: 

670 """ 

671 Get list of tracked files. 

672 

673 Args: 

674 watch_path: Optional filter by watch path. 

675 

676 Returns: 

677 List of file records. 

678 """ 

679 conn = self._ensure_connection() 

680 with self._lock: 

681 if watch_path: 

682 normalized = FileWatcher._normalize_path(watch_path) 

683 rows = conn.execute( 

684 "SELECT * FROM tracked_files WHERE watch_path = ?", 

685 (normalized,) 

686 ).fetchall() 

687 else: 

688 rows = conn.execute("SELECT * FROM tracked_files").fetchall() 

689 

690 return [dict(row) for row in rows] 

691 

692 def get_watched_paths(self) -> List[str]: 

693 """Get list of currently watched paths.""" 

694 with self._lock: 

695 return list(self._watchers.keys()) 

696 

697 def add_change_callback(self, callback: Callable[[FileChange], None]) -> None: 

698 """Add a callback for file change events.""" 

699 self._change_callbacks.append(callback) 

700 

701 def remove_change_callback(self, callback: Callable[[FileChange], None]) -> None: 

702 """Remove a change callback.""" 

703 if callback in self._change_callbacks: 

704 self._change_callbacks.remove(callback) 

705 

706 def _on_file_change(self, change: FileChange) -> None: 

707 """Handle file change event from watcher.""" 

708 self._record_change(change) 

709 

710 # Update file record 

711 if change.change_type == ChangeType.DELETED: 

712 self._delete_file_record(change.path) 

713 else: 

714 # Find which watch path this belongs to 

715 watch_path = None 

716 for wp in self._watchers.keys(): 

717 if change.path.startswith(wp): 

718 watch_path = wp 

719 break 

720 if watch_path: 

721 self._update_file_record(change.path, watch_path, change.size, change.content_hash) 

722 

723 # Notify callbacks 

724 for callback in self._change_callbacks: 

725 try: 

726 callback(change) 

727 except Exception: 

728 pass 

729 

730 def _record_change(self, change: FileChange) -> None: 

731 """Record a change to the database.""" 

732 conn = self._ensure_connection() 

733 with self._lock: 

734 conn.execute( 

735 """ 

736 INSERT INTO file_changes (path, change_type, timestamp, old_path, size, content_hash, metadata) 

737 VALUES (?, ?, ?, ?, ?, ?, ?) 

738 """, 

739 ( 

740 change.path, 

741 change.change_type.value, 

742 change.timestamp.timestamp(), 

743 change.old_path, 

744 change.size, 

745 change.content_hash, 

746 json.dumps(change.metadata), 

747 ) 

748 ) 

749 

750 def _update_file_record(self, path: str, watch_path: str, size: int, content_hash: str) -> None: 

751 """Update or insert a file record.""" 

752 conn = self._ensure_connection() 

753 now = time.time() 

754 with self._lock: 

755 conn.execute( 

756 """ 

757 INSERT OR REPLACE INTO tracked_files 

758 (path, watch_path, size, content_hash, first_seen, last_modified, last_synced) 

759 VALUES (?, ?, ?, ?, 

760 COALESCE((SELECT first_seen FROM tracked_files WHERE path = ?), ?), 

761 ?, ?) 

762 """, 

763 (path, watch_path, size, content_hash, path, now, now, now) 

764 ) 

765 

766 def _delete_file_record(self, path: str) -> None: 

767 """Delete a file record.""" 

768 conn = self._ensure_connection() 

769 with self._lock: 

770 conn.execute("DELETE FROM tracked_files WHERE path = ?", (path,)) 

771 

772 def cleanup_old_changes(self, days: int = 30) -> int: 

773 """ 

774 Remove change records older than specified days. 

775 

776 Args: 

777 days: Maximum age of records to keep. 

778 

779 Returns: 

780 Number of records deleted. 

781 """ 

782 cutoff = time.time() - (days * 24 * 3600) 

783 conn = self._ensure_connection() 

784 with self._lock: 

785 cursor = conn.execute( 

786 "DELETE FROM file_changes WHERE timestamp < ?", 

787 (cutoff,) 

788 ) 

789 return cursor.rowcount 

790 

791 def close(self) -> None: 

792 """Stop all watchers and close database connection.""" 

793 with self._lock: 

794 for watcher in self._watchers.values(): 

795 watcher.stop() 

796 self._watchers.clear() 

797 

798 if self._conn: 

799 self._conn.close() 

800 self._conn = None 

801 

802 def __enter__(self): 

803 return self 

804 

805 def __exit__(self, exc_type, exc_val, exc_tb): 

806 self.close() 

807 return False