Coverage for core / hub_allowlist.py: 0.0%

90 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2core.hub_allowlist — runtime-editable trusted HuggingFace org allowlist. 

3 

4WHY THIS EXISTS 

5─────────────── 

6The trusted-publisher list used to live as a frozenset literal at 

7main.py:1568 (`_TRUSTED_HF_ORGS`). Adding a new trusted org (e.g., 

8an enterprise tenant's internal HF org `acme-corp`) required a code 

9edit, a release, and a redeploy — friction high enough that the field 

10team just told customers to pass `confirm_unverified=True` instead. 

11That defeated the entire safety gate the list was meant to enforce. 

12 

13This module persists the list to `~/.nunba/hub_allowlist.json` and 

14exposes `add(org, reason)` / `remove(org)` / `is_trusted(org)` / 

15`list()` so the operator can manage it via the admin UI without a 

16release. 

17 

18THREAT MODEL 

19──────────── 

20- Typosquat orgs (Unicode homoglyphs handled separately by 

21 `_normalize_hf_id` in main.py). 

22- Drive-by "hot on HF Hub" installs of pickled weights with arbitrary- 

23 code-execution payloads in `torch.load()`. 

24- Supply-chain attacks via account-takeover of a previously-trusted org 

25 (handled via `remove()` — operator can rapidly revoke without a release). 

26 

27DESIGN NOTES 

28──────────── 

29- Case-insensitive matching (HF org names are case-preserved but 

30 case-insensitive at the resolver level; `Qwen` and `qwen` resolve to 

31 the same repo, so an attacker can't slip in a `Qwen` clone if `qwen` 

32 is allowed). 

33- Reasons are persisted alongside each entry so the next operator can 

34 see WHY an org was added (audit trail without a separate log channel). 

35- Default-seeded with the 23 orgs that were in the previous frozenset, 

36 so cold installs preserve the existing trust posture. 

37- File layout uses indented JSON for git-friendly diffs when an operator 

38 checks the file into a config repo. 

39- The path defaults to `~/.nunba/hub_allowlist.json` to follow the same 

40 pattern as `~/.nunba/mcp.token` — operators learn ONE config location. 

41""" 

42from __future__ import annotations 

43 

44import json 

45import logging 

46import os 

47import threading 

48import time 

49from pathlib import Path 

50from typing import Dict, List, Optional 

51 

52logger = logging.getLogger(__name__) 

53 

54 

55# Seeded defaults — extracted verbatim from the previous _TRUSTED_HF_ORGS 

56# frozenset in main.py. DO NOT prune entries here without operator 

57# notice; existing installs that re-seed will lose previously-trusted orgs. 

58DEFAULT_TRUSTED_ORGS: Dict[str, str] = { 

59 'google': 'Google AI / Gemma / Gemini ecosystem', 

60 'microsoft': 'Microsoft (Phi, Bing) — verified publisher', 

61 'openai': 'OpenAI official org', 

62 'meta-llama': 'Meta Llama foundation models', 

63 'mistralai': 'Mistral AI official', 

64 'Qwen': 'Alibaba Qwen foundation models', 

65 'ai4bharat': 'AI4Bharat — Indic language ecosystem (T9 cohort)', 

66 'facebook': 'Meta (legacy facebook namespace)', 

67 'HuggingFaceTB': 'HuggingFace TB org (small efficient models)', 

68 'HuggingFaceH4': 'HuggingFace H4 org (instruction-tuned variants)', 

69 'suno': 'Suno (Bark TTS)', 

70 'coqui': 'Coqui TTS / XTTS', 

71 'hexgrad': 'Kokoro TTS publisher', 

72 'SparkAudio': 'Spark TTS publisher', 

73 'nvidia': 'NVIDIA official models (NeMo)', 

74 'NousResearch': 'Nous Research (Hermes, etc.)', 

75 'pyannote': 'pyannote-audio diarization', 

76 'openai-community': 'OpenAI community-mirror org', 

77 'sentence-transformers': 'Sentence Transformers (embeddings)', 

78 'BAAI': 'Beijing Academy of AI', 

79 'intfloat': 'intfloat (E5 embedding family)', 

80 'mixedbread-ai': 'mixedbread-ai (mxbai embeddings)', 

81 'stabilityai': 'Stability AI (SD, SDXL, Stable Audio)', 

82 'runwayml': 'RunwayML (legacy SD checkpoints)', 

83 'CompVis': 'CompVis (legacy SD checkpoints)', 

84 'hertz-ai': 'Hevolve / HARTOS / Nunba — first-party', 

85 'HertzAI': 'Hevolve / HARTOS / Nunba — first-party (capitalized)', 

86} 

87 

88 

89def _default_path() -> Path: 

90 """Default config path: ~/.nunba/hub_allowlist.json (Linux/macOS) 

91 or %USERPROFILE%/.nunba/hub_allowlist.json (Windows). Mirrors 

92 the location of mcp.token so operators learn one config root.""" 

93 return Path.home() / '.nunba' / 'hub_allowlist.json' 

94 

95 

96class HubAllowlist: 

97 """Persistent, runtime-editable allowlist of trusted HF org publishers. 

98 

99 Thread-safe (the admin UI can fire concurrent add/remove during a 

100 multi-tenant tenant-onboarding flow). Reads are lock-free — 

101 `is_trusted` is on the chat-install hot path and must be cheap. 

102 """ 

103 

104 def __init__(self, config_path: Optional[Path] = None) -> None: 

105 self._path = Path(config_path) if config_path else _default_path() 

106 self._lock = threading.RLock() 

107 self._entries: Dict[str, Dict[str, object]] = {} 

108 self._load_or_seed() 

109 

110 # ── Persistence ──────────────────────────────────────────────────── 

111 

112 def _load_or_seed(self) -> None: 

113 """Load from disk, or seed defaults if no config exists yet.""" 

114 if self._path.is_file(): 

115 try: 

116 with self._path.open(encoding='utf-8') as f: 

117 raw = json.load(f) 

118 # Tolerate legacy formats: list-of-strings → minimal entries. 

119 if isinstance(raw, list): 

120 self._entries = { 

121 org: {'reason': '(legacy import)', 'added_at': time.time()} 

122 for org in raw if isinstance(org, str) 

123 } 

124 elif isinstance(raw, dict): 

125 self._entries = { 

126 k: dict(v) if isinstance(v, dict) 

127 else {'reason': str(v), 'added_at': time.time()} 

128 for k, v in raw.items() 

129 } 

130 else: 

131 raise ValueError(f"unexpected JSON root type: {type(raw)}") 

132 return 

133 except Exception as e: 

134 # Don't kill startup over a corrupt config — log loud and 

135 # fall through to seeding defaults. Operator sees the 

136 # warning in /api/admin/diag/degradations-adjacent log. 

137 logger.warning( 

138 "hub_allowlist: failed to load %s (%s) — seeding defaults", 

139 self._path, e, 

140 ) 

141 self._seed_defaults() 

142 self._save() 

143 

144 def _seed_defaults(self) -> None: 

145 now = time.time() 

146 self._entries = { 

147 org: {'reason': reason, 'added_at': now} 

148 for org, reason in DEFAULT_TRUSTED_ORGS.items() 

149 } 

150 

151 def _save(self) -> None: 

152 """Atomic write — never leave a half-written JSON on disk.""" 

153 try: 

154 self._path.parent.mkdir(parents=True, exist_ok=True) 

155 tmp = self._path.with_suffix('.json.tmp') 

156 with tmp.open('w', encoding='utf-8') as f: 

157 json.dump(self._entries, f, indent=2, sort_keys=True) 

158 os.replace(tmp, self._path) 

159 except OSError as e: 

160 logger.warning("hub_allowlist: save failed (%s)", e) 

161 

162 # ── Read ─────────────────────────────────────────────────────────── 

163 

164 def is_trusted(self, org: str) -> bool: 

165 """Case-insensitive trust check. 

166 

167 HF org names are case-preserved but case-insensitive at resolution 

168 time — `Qwen` and `qwen` map to the same repo, so an attacker 

169 can't slip in a `Qwen` clone if only `qwen` is allowed. We 

170 normalize both sides to lower for the comparison. 

171 """ 

172 if not org or not isinstance(org, str): 

173 return False 

174 target = org.strip().lower() 

175 for known in self._entries: 

176 if known.lower() == target: 

177 return True 

178 return False 

179 

180 def list(self) -> List[Dict[str, object]]: 

181 """Return all entries with metadata for the admin UI.""" 

182 with self._lock: 

183 return [ 

184 { 

185 'org': org, 

186 'reason': info.get('reason', ''), 

187 'added_at': info.get('added_at', 0), 

188 } 

189 for org, info in sorted(self._entries.items()) 

190 ] 

191 

192 # ── Write ────────────────────────────────────────────────────────── 

193 

194 def add(self, org: str, reason: str) -> None: 

195 """Add an org with a human-readable reason. 

196 

197 Raises ValueError on invalid input — the admin handler turns this 

198 into a 400 with the message intact. 

199 """ 

200 if not org or not isinstance(org, str): 

201 raise ValueError("org must be a non-empty string") 

202 org_clean = org.strip() 

203 if '/' in org_clean or ' ' in org_clean: 

204 raise ValueError("org must not contain '/' or whitespace") 

205 # ASCII-only — same homoglyph defense as _normalize_hf_id. 

206 if any(ord(c) > 0x7F for c in org_clean): 

207 raise ValueError( 

208 "org must be ASCII-only (Unicode homoglyph defense)" 

209 ) 

210 if not reason or not isinstance(reason, str): 

211 raise ValueError("reason must be a non-empty string") 

212 with self._lock: 

213 self._entries[org_clean] = { 

214 'reason': reason.strip(), 

215 'added_at': time.time(), 

216 } 

217 self._save() 

218 

219 def remove(self, org: str) -> bool: 

220 """Remove an org by exact-match key. Returns True if removed, 

221 False if it wasn't present (idempotent for the operator UI).""" 

222 if not org or not isinstance(org, str): 

223 return False 

224 with self._lock: 

225 # Case-insensitive remove: find the actual stored key. 

226 target = org.strip().lower() 

227 for stored in list(self._entries.keys()): 

228 if stored.lower() == target: 

229 del self._entries[stored] 

230 self._save() 

231 return True 

232 return False 

233 

234 

235# ── Process-wide singleton accessor ────────────────────────────────────── 

236# Lazy so test code can swap the path via `HubAllowlist(config_path=...)` 

237# before the singleton is created. Re-instantiation is cheap. 

238_INSTANCE: Optional[HubAllowlist] = None 

239_INSTANCE_LOCK = threading.Lock() 

240 

241 

242def get_allowlist() -> HubAllowlist: 

243 """Return the process-wide allowlist singleton.""" 

244 global _INSTANCE 

245 if _INSTANCE is None: 

246 with _INSTANCE_LOCK: 

247 if _INSTANCE is None: 

248 _INSTANCE = HubAllowlist() 

249 return _INSTANCE 

250 

251 

252def reset_for_tests() -> None: 

253 """Clear the singleton so a test can swap the config path. Test-only.""" 

254 global _INSTANCE 

255 with _INSTANCE_LOCK: 

256 _INSTANCE = None 

257 

258 

259__all__ = [ 

260 'DEFAULT_TRUSTED_ORGS', 

261 'HubAllowlist', 

262 'get_allowlist', 

263 'reset_for_tests', 

264]