Coverage for integrations / service_tools / model_storage.py: 89.0%
109 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Model Storage Manager — centralized model storage at ~/.hevolve/models/
4Tracks all downloaded models (git repos, HuggingFace weights) in a single
5manifest.json so the user can see where their disk space is going and
6the RuntimeToolManager can skip re-downloads.
8Pattern from: integrations/vision/minicpm_installer.py
9"""
11import json
12import logging
13import os
14import shutil
15import subprocess
16import sys
17from datetime import datetime
18from pathlib import Path
19from typing import Dict, Optional
21logger = logging.getLogger(__name__)
23BASE_DIR = Path.home() / '.hevolve' / 'models'
24MANIFEST_FILE = BASE_DIR / 'manifest.json'
27class ModelStorageManager:
28 """Centralized model storage with manifest tracking."""
30 def __init__(self, base_dir: Path = None):
31 self.base_dir = base_dir or BASE_DIR
32 self.manifest_file = self.base_dir / 'manifest.json'
33 self.base_dir.mkdir(parents=True, exist_ok=True)
35 # ── Path helpers ──────────────────────────────────────────────
37 def get_tool_dir(self, tool_name: str) -> Path:
38 """Return the storage directory for a given tool."""
39 return self.base_dir / tool_name
41 # ── Manifest I/O ─────────────────────────────────────────────
43 def _read_manifest(self) -> Dict:
44 if self.manifest_file.exists():
45 try:
46 return json.loads(self.manifest_file.read_text())
47 except (json.JSONDecodeError, OSError):
48 logger.warning("Corrupt manifest.json — resetting")
49 return {"tools": {}}
51 def _write_manifest(self, data: Dict) -> None:
52 self.manifest_file.write_text(json.dumps(data, indent=2))
54 def get_manifest(self) -> Dict:
55 """Return the full manifest."""
56 return self._read_manifest()
58 # ── Download state ───────────────────────────────────────────
60 def is_downloaded(self, tool_name: str) -> bool:
61 """Check if a tool's models are already downloaded."""
62 manifest = self._read_manifest()
63 entry = manifest.get("tools", {}).get(tool_name)
64 if not entry:
65 return False
66 # Also verify the directory actually exists
67 tool_dir = self.get_tool_dir(tool_name)
68 return tool_dir.exists() and any(tool_dir.iterdir())
70 def mark_downloaded(self, tool_name: str, source_url: str,
71 size_bytes: int = 0) -> None:
72 """Record that a tool's models have been downloaded."""
73 manifest = self._read_manifest()
74 manifest.setdefault("tools", {})[tool_name] = {
75 "source_url": source_url,
76 "size_bytes": size_bytes,
77 "downloaded_at": datetime.now().isoformat(),
78 "path": str(self.get_tool_dir(tool_name)),
79 }
80 self._write_manifest(manifest)
81 logger.info(f"Marked {tool_name} as downloaded ({size_bytes / 1e9:.2f} GB)")
83 # ── Size tracking ────────────────────────────────────────────
85 def get_tool_size(self, tool_name: str) -> int:
86 """Return total bytes used by a tool's directory."""
87 tool_dir = self.get_tool_dir(tool_name)
88 if not tool_dir.exists():
89 return 0
90 total = 0
91 for f in tool_dir.rglob('*'):
92 if f.is_file():
93 total += f.stat().st_size
94 return total
96 def get_total_size(self) -> int:
97 """Return total bytes used by all models."""
98 if not self.base_dir.exists():
99 return 0
100 total = 0
101 for f in self.base_dir.rglob('*'):
102 if f.is_file():
103 total += f.stat().st_size
104 return total
106 # ── Git clone ────────────────────────────────────────────────
108 def clone_repo(self, tool_name: str, repo_url: str,
109 branch: str = None) -> Optional[Path]:
110 """Clone (or pull) a git repo into the tool's directory.
112 Returns the tool directory on success, None on failure.
113 """
114 tool_dir = self.get_tool_dir(tool_name)
116 if tool_dir.exists() and (tool_dir / '.git').exists():
117 # Already cloned — pull latest
118 logger.info(f"Pulling latest for {tool_name}...")
119 try:
120 _git_kwargs = dict(cwd=str(tool_dir), capture_output=True, timeout=120)
121 if sys.platform == 'win32':
122 _git_kwargs['creationflags'] = subprocess.CREATE_NO_WINDOW
123 subprocess.run(['git', 'pull'], **_git_kwargs)
124 return tool_dir
125 except Exception as e:
126 logger.warning(f"git pull failed for {tool_name}: {e}")
127 return tool_dir # still usable
129 # Fresh clone
130 logger.info(f"Cloning {repo_url} into {tool_dir}...")
131 tool_dir.mkdir(parents=True, exist_ok=True)
132 cmd = ['git', 'clone', '--depth', '1']
133 if branch:
134 cmd += ['--branch', branch]
135 cmd += [repo_url, str(tool_dir)]
137 try:
138 _clone_kwargs = dict(capture_output=True, text=True, timeout=300)
139 if sys.platform == 'win32':
140 _clone_kwargs['creationflags'] = subprocess.CREATE_NO_WINDOW
141 result = subprocess.run(cmd, **_clone_kwargs)
142 if result.returncode != 0:
143 logger.error(f"git clone failed: {result.stderr[:300]}")
144 return None
145 size = self.get_tool_size(tool_name)
146 self.mark_downloaded(tool_name, repo_url, size)
147 return tool_dir
148 except Exception as e:
149 logger.error(f"git clone failed for {tool_name}: {e}")
150 return None
152 # ── HuggingFace download ─────────────────────────────────────
154 def download_hf_model(self, tool_name: str, repo_id: str,
155 **kwargs) -> Optional[Path]:
156 """Download a HuggingFace model using snapshot_download.
158 Pattern from minicpm_installer.py.
159 Returns the tool directory on success, None on failure.
160 """
161 tool_dir = self.get_tool_dir(tool_name)
163 if self.is_downloaded(tool_name):
164 logger.info(f"HF model for {tool_name} already downloaded")
165 return tool_dir
167 tool_dir.mkdir(parents=True, exist_ok=True)
169 try:
170 from huggingface_hub import snapshot_download
171 logger.info(f"Downloading {repo_id} to {tool_dir}...")
172 snapshot_download(
173 repo_id=repo_id,
174 local_dir=str(tool_dir),
175 local_dir_use_symlinks=False,
176 **kwargs,
177 )
178 size = self.get_tool_size(tool_name)
179 self.mark_downloaded(tool_name, f"hf://{repo_id}", size)
180 return tool_dir
181 except ImportError:
182 logger.error("huggingface_hub not installed. pip install huggingface_hub")
183 return None
184 except Exception as e:
185 logger.error(f"HF download failed for {tool_name}: {e}")
186 return None
188 # ── Cleanup ──────────────────────────────────────────────────
190 def remove_tool(self, tool_name: str) -> bool:
191 """Remove a tool's models and manifest entry."""
192 tool_dir = self.get_tool_dir(tool_name)
193 if tool_dir.exists():
194 shutil.rmtree(tool_dir, ignore_errors=True)
196 manifest = self._read_manifest()
197 manifest.get("tools", {}).pop(tool_name, None)
198 self._write_manifest(manifest)
200 logger.info(f"Removed {tool_name} from model storage")
201 return True
204# Global singleton
205model_storage = ModelStorageManager()