Coverage for integrations / service_tools / model_storage.py: 89.0%

109 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Model Storage Manager — centralized model storage at ~/.hevolve/models/ 

3 

4Tracks all downloaded models (git repos, HuggingFace weights) in a single 

5manifest.json so the user can see where their disk space is going and 

6the RuntimeToolManager can skip re-downloads. 

7 

8Pattern from: integrations/vision/minicpm_installer.py 

9""" 

10 

11import json 

12import logging 

13import os 

14import shutil 

15import subprocess 

16import sys 

17from datetime import datetime 

18from pathlib import Path 

19from typing import Dict, Optional 

20 

21logger = logging.getLogger(__name__) 

22 

23BASE_DIR = Path.home() / '.hevolve' / 'models' 

24MANIFEST_FILE = BASE_DIR / 'manifest.json' 

25 

26 

27class ModelStorageManager: 

28 """Centralized model storage with manifest tracking.""" 

29 

30 def __init__(self, base_dir: Path = None): 

31 self.base_dir = base_dir or BASE_DIR 

32 self.manifest_file = self.base_dir / 'manifest.json' 

33 self.base_dir.mkdir(parents=True, exist_ok=True) 

34 

35 # ── Path helpers ────────────────────────────────────────────── 

36 

37 def get_tool_dir(self, tool_name: str) -> Path: 

38 """Return the storage directory for a given tool.""" 

39 return self.base_dir / tool_name 

40 

41 # ── Manifest I/O ───────────────────────────────────────────── 

42 

43 def _read_manifest(self) -> Dict: 

44 if self.manifest_file.exists(): 

45 try: 

46 return json.loads(self.manifest_file.read_text()) 

47 except (json.JSONDecodeError, OSError): 

48 logger.warning("Corrupt manifest.json — resetting") 

49 return {"tools": {}} 

50 

51 def _write_manifest(self, data: Dict) -> None: 

52 self.manifest_file.write_text(json.dumps(data, indent=2)) 

53 

54 def get_manifest(self) -> Dict: 

55 """Return the full manifest.""" 

56 return self._read_manifest() 

57 

58 # ── Download state ─────────────────────────────────────────── 

59 

60 def is_downloaded(self, tool_name: str) -> bool: 

61 """Check if a tool's models are already downloaded.""" 

62 manifest = self._read_manifest() 

63 entry = manifest.get("tools", {}).get(tool_name) 

64 if not entry: 

65 return False 

66 # Also verify the directory actually exists 

67 tool_dir = self.get_tool_dir(tool_name) 

68 return tool_dir.exists() and any(tool_dir.iterdir()) 

69 

70 def mark_downloaded(self, tool_name: str, source_url: str, 

71 size_bytes: int = 0) -> None: 

72 """Record that a tool's models have been downloaded.""" 

73 manifest = self._read_manifest() 

74 manifest.setdefault("tools", {})[tool_name] = { 

75 "source_url": source_url, 

76 "size_bytes": size_bytes, 

77 "downloaded_at": datetime.now().isoformat(), 

78 "path": str(self.get_tool_dir(tool_name)), 

79 } 

80 self._write_manifest(manifest) 

81 logger.info(f"Marked {tool_name} as downloaded ({size_bytes / 1e9:.2f} GB)") 

82 

83 # ── Size tracking ──────────────────────────────────────────── 

84 

85 def get_tool_size(self, tool_name: str) -> int: 

86 """Return total bytes used by a tool's directory.""" 

87 tool_dir = self.get_tool_dir(tool_name) 

88 if not tool_dir.exists(): 

89 return 0 

90 total = 0 

91 for f in tool_dir.rglob('*'): 

92 if f.is_file(): 

93 total += f.stat().st_size 

94 return total 

95 

96 def get_total_size(self) -> int: 

97 """Return total bytes used by all models.""" 

98 if not self.base_dir.exists(): 

99 return 0 

100 total = 0 

101 for f in self.base_dir.rglob('*'): 

102 if f.is_file(): 

103 total += f.stat().st_size 

104 return total 

105 

106 # ── Git clone ──────────────────────────────────────────────── 

107 

108 def clone_repo(self, tool_name: str, repo_url: str, 

109 branch: str = None) -> Optional[Path]: 

110 """Clone (or pull) a git repo into the tool's directory. 

111 

112 Returns the tool directory on success, None on failure. 

113 """ 

114 tool_dir = self.get_tool_dir(tool_name) 

115 

116 if tool_dir.exists() and (tool_dir / '.git').exists(): 

117 # Already cloned — pull latest 

118 logger.info(f"Pulling latest for {tool_name}...") 

119 try: 

120 _git_kwargs = dict(cwd=str(tool_dir), capture_output=True, timeout=120) 

121 if sys.platform == 'win32': 

122 _git_kwargs['creationflags'] = subprocess.CREATE_NO_WINDOW 

123 subprocess.run(['git', 'pull'], **_git_kwargs) 

124 return tool_dir 

125 except Exception as e: 

126 logger.warning(f"git pull failed for {tool_name}: {e}") 

127 return tool_dir # still usable 

128 

129 # Fresh clone 

130 logger.info(f"Cloning {repo_url} into {tool_dir}...") 

131 tool_dir.mkdir(parents=True, exist_ok=True) 

132 cmd = ['git', 'clone', '--depth', '1'] 

133 if branch: 

134 cmd += ['--branch', branch] 

135 cmd += [repo_url, str(tool_dir)] 

136 

137 try: 

138 _clone_kwargs = dict(capture_output=True, text=True, timeout=300) 

139 if sys.platform == 'win32': 

140 _clone_kwargs['creationflags'] = subprocess.CREATE_NO_WINDOW 

141 result = subprocess.run(cmd, **_clone_kwargs) 

142 if result.returncode != 0: 

143 logger.error(f"git clone failed: {result.stderr[:300]}") 

144 return None 

145 size = self.get_tool_size(tool_name) 

146 self.mark_downloaded(tool_name, repo_url, size) 

147 return tool_dir 

148 except Exception as e: 

149 logger.error(f"git clone failed for {tool_name}: {e}") 

150 return None 

151 

152 # ── HuggingFace download ───────────────────────────────────── 

153 

154 def download_hf_model(self, tool_name: str, repo_id: str, 

155 **kwargs) -> Optional[Path]: 

156 """Download a HuggingFace model using snapshot_download. 

157 

158 Pattern from minicpm_installer.py. 

159 Returns the tool directory on success, None on failure. 

160 """ 

161 tool_dir = self.get_tool_dir(tool_name) 

162 

163 if self.is_downloaded(tool_name): 

164 logger.info(f"HF model for {tool_name} already downloaded") 

165 return tool_dir 

166 

167 tool_dir.mkdir(parents=True, exist_ok=True) 

168 

169 try: 

170 from huggingface_hub import snapshot_download 

171 logger.info(f"Downloading {repo_id} to {tool_dir}...") 

172 snapshot_download( 

173 repo_id=repo_id, 

174 local_dir=str(tool_dir), 

175 local_dir_use_symlinks=False, 

176 **kwargs, 

177 ) 

178 size = self.get_tool_size(tool_name) 

179 self.mark_downloaded(tool_name, f"hf://{repo_id}", size) 

180 return tool_dir 

181 except ImportError: 

182 logger.error("huggingface_hub not installed. pip install huggingface_hub") 

183 return None 

184 except Exception as e: 

185 logger.error(f"HF download failed for {tool_name}: {e}") 

186 return None 

187 

188 # ── Cleanup ────────────────────────────────────────────────── 

189 

190 def remove_tool(self, tool_name: str) -> bool: 

191 """Remove a tool's models and manifest entry.""" 

192 tool_dir = self.get_tool_dir(tool_name) 

193 if tool_dir.exists(): 

194 shutil.rmtree(tool_dir, ignore_errors=True) 

195 

196 manifest = self._read_manifest() 

197 manifest.get("tools", {}).pop(tool_name, None) 

198 self._write_manifest(manifest) 

199 

200 logger.info(f"Removed {tool_name} from model storage") 

201 return True 

202 

203 

204# Global singleton 

205model_storage = ModelStorageManager()