Coverage for core / hub_allowlist.py: 0.0%
90 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2core.hub_allowlist — runtime-editable trusted HuggingFace org allowlist.
4WHY THIS EXISTS
5───────────────
6The trusted-publisher list used to live as a frozenset literal at
7main.py:1568 (`_TRUSTED_HF_ORGS`). Adding a new trusted org (e.g.,
8an enterprise tenant's internal HF org `acme-corp`) required a code
9edit, a release, and a redeploy — friction high enough that the field
10team just told customers to pass `confirm_unverified=True` instead.
11That defeated the entire safety gate the list was meant to enforce.
13This module persists the list to `~/.nunba/hub_allowlist.json` and
14exposes `add(org, reason)` / `remove(org)` / `is_trusted(org)` /
15`list()` so the operator can manage it via the admin UI without a
16release.
18THREAT MODEL
19────────────
20- Typosquat orgs (Unicode homoglyphs handled separately by
21 `_normalize_hf_id` in main.py).
22- Drive-by "hot on HF Hub" installs of pickled weights with arbitrary-
23 code-execution payloads in `torch.load()`.
24- Supply-chain attacks via account-takeover of a previously-trusted org
25 (handled via `remove()` — operator can rapidly revoke without a release).
27DESIGN NOTES
28────────────
29- Case-insensitive matching (HF org names are case-preserved but
30 case-insensitive at the resolver level; `Qwen` and `qwen` resolve to
31 the same repo, so an attacker can't slip in a `Qwen` clone if `qwen`
32 is allowed).
33- Reasons are persisted alongside each entry so the next operator can
34 see WHY an org was added (audit trail without a separate log channel).
35- Default-seeded with the 23 orgs that were in the previous frozenset,
36 so cold installs preserve the existing trust posture.
37- File layout uses indented JSON for git-friendly diffs when an operator
38 checks the file into a config repo.
39- The path defaults to `~/.nunba/hub_allowlist.json` to follow the same
40 pattern as `~/.nunba/mcp.token` — operators learn ONE config location.
41"""
42from __future__ import annotations
44import json
45import logging
46import os
47import threading
48import time
49from pathlib import Path
50from typing import Dict, List, Optional
52logger = logging.getLogger(__name__)
55# Seeded defaults — extracted verbatim from the previous _TRUSTED_HF_ORGS
56# frozenset in main.py. DO NOT prune entries here without operator
57# notice; existing installs that re-seed will lose previously-trusted orgs.
58DEFAULT_TRUSTED_ORGS: Dict[str, str] = {
59 'google': 'Google AI / Gemma / Gemini ecosystem',
60 'microsoft': 'Microsoft (Phi, Bing) — verified publisher',
61 'openai': 'OpenAI official org',
62 'meta-llama': 'Meta Llama foundation models',
63 'mistralai': 'Mistral AI official',
64 'Qwen': 'Alibaba Qwen foundation models',
65 'ai4bharat': 'AI4Bharat — Indic language ecosystem (T9 cohort)',
66 'facebook': 'Meta (legacy facebook namespace)',
67 'HuggingFaceTB': 'HuggingFace TB org (small efficient models)',
68 'HuggingFaceH4': 'HuggingFace H4 org (instruction-tuned variants)',
69 'suno': 'Suno (Bark TTS)',
70 'coqui': 'Coqui TTS / XTTS',
71 'hexgrad': 'Kokoro TTS publisher',
72 'SparkAudio': 'Spark TTS publisher',
73 'nvidia': 'NVIDIA official models (NeMo)',
74 'NousResearch': 'Nous Research (Hermes, etc.)',
75 'pyannote': 'pyannote-audio diarization',
76 'openai-community': 'OpenAI community-mirror org',
77 'sentence-transformers': 'Sentence Transformers (embeddings)',
78 'BAAI': 'Beijing Academy of AI',
79 'intfloat': 'intfloat (E5 embedding family)',
80 'mixedbread-ai': 'mixedbread-ai (mxbai embeddings)',
81 'stabilityai': 'Stability AI (SD, SDXL, Stable Audio)',
82 'runwayml': 'RunwayML (legacy SD checkpoints)',
83 'CompVis': 'CompVis (legacy SD checkpoints)',
84 'hertz-ai': 'Hevolve / HARTOS / Nunba — first-party',
85 'HertzAI': 'Hevolve / HARTOS / Nunba — first-party (capitalized)',
86}
89def _default_path() -> Path:
90 """Default config path: ~/.nunba/hub_allowlist.json (Linux/macOS)
91 or %USERPROFILE%/.nunba/hub_allowlist.json (Windows). Mirrors
92 the location of mcp.token so operators learn one config root."""
93 return Path.home() / '.nunba' / 'hub_allowlist.json'
96class HubAllowlist:
97 """Persistent, runtime-editable allowlist of trusted HF org publishers.
99 Thread-safe (the admin UI can fire concurrent add/remove during a
100 multi-tenant tenant-onboarding flow). Reads are lock-free —
101 `is_trusted` is on the chat-install hot path and must be cheap.
102 """
104 def __init__(self, config_path: Optional[Path] = None) -> None:
105 self._path = Path(config_path) if config_path else _default_path()
106 self._lock = threading.RLock()
107 self._entries: Dict[str, Dict[str, object]] = {}
108 self._load_or_seed()
110 # ── Persistence ────────────────────────────────────────────────────
112 def _load_or_seed(self) -> None:
113 """Load from disk, or seed defaults if no config exists yet."""
114 if self._path.is_file():
115 try:
116 with self._path.open(encoding='utf-8') as f:
117 raw = json.load(f)
118 # Tolerate legacy formats: list-of-strings → minimal entries.
119 if isinstance(raw, list):
120 self._entries = {
121 org: {'reason': '(legacy import)', 'added_at': time.time()}
122 for org in raw if isinstance(org, str)
123 }
124 elif isinstance(raw, dict):
125 self._entries = {
126 k: dict(v) if isinstance(v, dict)
127 else {'reason': str(v), 'added_at': time.time()}
128 for k, v in raw.items()
129 }
130 else:
131 raise ValueError(f"unexpected JSON root type: {type(raw)}")
132 return
133 except Exception as e:
134 # Don't kill startup over a corrupt config — log loud and
135 # fall through to seeding defaults. Operator sees the
136 # warning in /api/admin/diag/degradations-adjacent log.
137 logger.warning(
138 "hub_allowlist: failed to load %s (%s) — seeding defaults",
139 self._path, e,
140 )
141 self._seed_defaults()
142 self._save()
144 def _seed_defaults(self) -> None:
145 now = time.time()
146 self._entries = {
147 org: {'reason': reason, 'added_at': now}
148 for org, reason in DEFAULT_TRUSTED_ORGS.items()
149 }
151 def _save(self) -> None:
152 """Atomic write — never leave a half-written JSON on disk."""
153 try:
154 self._path.parent.mkdir(parents=True, exist_ok=True)
155 tmp = self._path.with_suffix('.json.tmp')
156 with tmp.open('w', encoding='utf-8') as f:
157 json.dump(self._entries, f, indent=2, sort_keys=True)
158 os.replace(tmp, self._path)
159 except OSError as e:
160 logger.warning("hub_allowlist: save failed (%s)", e)
162 # ── Read ───────────────────────────────────────────────────────────
164 def is_trusted(self, org: str) -> bool:
165 """Case-insensitive trust check.
167 HF org names are case-preserved but case-insensitive at resolution
168 time — `Qwen` and `qwen` map to the same repo, so an attacker
169 can't slip in a `Qwen` clone if only `qwen` is allowed. We
170 normalize both sides to lower for the comparison.
171 """
172 if not org or not isinstance(org, str):
173 return False
174 target = org.strip().lower()
175 for known in self._entries:
176 if known.lower() == target:
177 return True
178 return False
180 def list(self) -> List[Dict[str, object]]:
181 """Return all entries with metadata for the admin UI."""
182 with self._lock:
183 return [
184 {
185 'org': org,
186 'reason': info.get('reason', ''),
187 'added_at': info.get('added_at', 0),
188 }
189 for org, info in sorted(self._entries.items())
190 ]
192 # ── Write ──────────────────────────────────────────────────────────
194 def add(self, org: str, reason: str) -> None:
195 """Add an org with a human-readable reason.
197 Raises ValueError on invalid input — the admin handler turns this
198 into a 400 with the message intact.
199 """
200 if not org or not isinstance(org, str):
201 raise ValueError("org must be a non-empty string")
202 org_clean = org.strip()
203 if '/' in org_clean or ' ' in org_clean:
204 raise ValueError("org must not contain '/' or whitespace")
205 # ASCII-only — same homoglyph defense as _normalize_hf_id.
206 if any(ord(c) > 0x7F for c in org_clean):
207 raise ValueError(
208 "org must be ASCII-only (Unicode homoglyph defense)"
209 )
210 if not reason or not isinstance(reason, str):
211 raise ValueError("reason must be a non-empty string")
212 with self._lock:
213 self._entries[org_clean] = {
214 'reason': reason.strip(),
215 'added_at': time.time(),
216 }
217 self._save()
219 def remove(self, org: str) -> bool:
220 """Remove an org by exact-match key. Returns True if removed,
221 False if it wasn't present (idempotent for the operator UI)."""
222 if not org or not isinstance(org, str):
223 return False
224 with self._lock:
225 # Case-insensitive remove: find the actual stored key.
226 target = org.strip().lower()
227 for stored in list(self._entries.keys()):
228 if stored.lower() == target:
229 del self._entries[stored]
230 self._save()
231 return True
232 return False
235# ── Process-wide singleton accessor ──────────────────────────────────────
236# Lazy so test code can swap the path via `HubAllowlist(config_path=...)`
237# before the singleton is created. Re-instantiation is cheap.
238_INSTANCE: Optional[HubAllowlist] = None
239_INSTANCE_LOCK = threading.Lock()
242def get_allowlist() -> HubAllowlist:
243 """Return the process-wide allowlist singleton."""
244 global _INSTANCE
245 if _INSTANCE is None:
246 with _INSTANCE_LOCK:
247 if _INSTANCE is None:
248 _INSTANCE = HubAllowlist()
249 return _INSTANCE
252def reset_for_tests() -> None:
253 """Clear the singleton so a test can swap the config path. Test-only."""
254 global _INSTANCE
255 with _INSTANCE_LOCK:
256 _INSTANCE = None
259__all__ = [
260 'DEFAULT_TRUSTED_ORGS',
261 'HubAllowlist',
262 'get_allowlist',
263 'reset_for_tests',
264]