Coverage for core / auth_local.py: 82.7%
75 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""Localhost-or-token decorator for HARTOS admin/control routes.
3Port of the canonical Nunba `routes/auth.py:require_local_or_token`
4pattern into HARTOS, where multiple endpoints (vlm_stop, prompts/sync,
5diag) need the same "trusted localhost OR valid Bearer token" semantic.
7Why this pattern instead of plain @require_auth:
8 Nunba's bundled install runs HARTOS on 127.0.0.1:5000 and has the
9 desktop tray (Tk indicator window, Python `app.py` / `main.py`)
10 POST to /api/vlm/stop directly — without a logged-in JWT context.
11 Adding plain @require_auth would break that user flow on every
12 Stop-button click. This decorator preserves the local-trust UX
13 while still rejecting remote unauthenticated callers.
15Threat model coverage:
16 ✓ Remote attacker on the LAN — rejected (remote_addr != localhost)
17 ✓ Remote attacker via DNS rebind — rejected (post-rebind remote_addr
18 is still the attacker's IP, not localhost)
19 ✓ Browser CSRF from same-origin localhost page — accepted (correct;
20 that's the intended Nunba SPA flow)
21 ✓ Browser CSRF from cross-origin page targeting localhost — REJECTED
22 when the destructive endpoint uses ``@require_local_or_token_csrf_safe``
23 (Phase 9.5 hardening). The Origin/Referer header check distinguishes
24 a same-origin SPA POST from a cross-origin attack page POST that
25 happens to land on remote_addr=127.0.0.1.
27Env vars:
28 HARTOS_API_TOKEN — optional shared secret. When set, callers may
29 send `Authorization: Bearer <token>` to bypass the localhost check.
30 Used by remote ops tooling and inter-node admin calls.
32 TRUSTED_PROXY — when HARTOS sits behind a reverse proxy (nginx,
33 Traefik), all requests appear as remote_addr=127.0.0.1 by default.
34 Setting this env to the proxy's address makes the decorator inspect
35 X-Forwarded-For instead. Without it, only direct-connection
36 remote_addr is trusted (safe default).
38 HARTOS_TRUSTED_ORIGINS — comma-separated list of origins that are
39 additionally treated as same-origin for the CSRF check (e.g.
40 "https://nunba.local,https://hevolve.ai"). Loopback origins
41 (http://localhost, http://127.0.0.1, http://[::1]) are always
42 accepted regardless.
43"""
44from __future__ import annotations
46import hmac
47import os
48from functools import wraps
49from urllib.parse import urlparse
51from flask import jsonify, request
53# Read once at import time (not per-request) so token rotation requires
54# a HARTOS restart — same model as Nunba.
55API_TOKEN = os.environ.get('HARTOS_API_TOKEN', '')
58def _is_local_request() -> bool:
59 """True if the request is from localhost, honouring TRUSTED_PROXY."""
60 trusted_proxy = os.environ.get('TRUSTED_PROXY', '')
61 if trusted_proxy and request.remote_addr == trusted_proxy:
62 forwarded_for = (request.headers.get('X-Forwarded-For', '')
63 .split(',')[0].strip())
64 return forwarded_for in ('127.0.0.1', '::1', 'localhost')
65 return request.remote_addr in ('127.0.0.1', '::1')
68# ── CSRF defense-in-depth (Phase 9.5) ──────────────────────────────
71_LOOPBACK_HOSTS = ('localhost', '127.0.0.1', '::1', '[::1]')
74def _origin_host(origin_value: str) -> str:
75 """Parse an Origin/Referer URL and return just the lowercased host
76 (without port). Returns '' on malformed input."""
77 if not origin_value:
78 return ''
79 try:
80 parsed = urlparse(origin_value)
81 return (parsed.hostname or '').lower()
82 except Exception:
83 return ''
86def _is_safe_csrf_origin() -> bool:
87 """Return True iff the request's Origin/Referer header matches the
88 set of trusted same-origin sources for state-changing destructive
89 endpoints.
91 Decision rules:
92 1. Both Origin AND Referer absent → ACCEPT (non-browser client;
93 curl, native desktop, Python requests). Browser-driven CSRF
94 attacks always send at least Origin or Referer — they cannot
95 be suppressed by an attacker page.
96 2. If Origin is present, its host MUST be loopback OR the
97 request's own Host OR a HARTOS_TRUSTED_ORIGINS entry.
98 3. If only Referer is present (older browsers / some Electron
99 paths), apply the same host check to its hostname.
101 This closes the same-machine cross-origin browser CSRF gap noted
102 in the module docstring. Wrapping a route with
103 ``@require_local_or_token_csrf_safe`` activates this gate; routes
104 that keep the original ``@require_local_or_token`` are unchanged.
105 """
106 origin_raw = request.headers.get('Origin', '').strip()
107 referer_raw = request.headers.get('Referer', '').strip()
108 if not origin_raw and not referer_raw:
109 # Browsers always send at least Origin on cross-origin POST
110 # (the spec requires it). Absence means the request came from
111 # a non-browser client — curl, native desktop, server-to-server.
112 # require_local_or_token has already established it's localhost
113 # or an authenticated token holder.
114 return True
116 # Build the allowed host set.
117 own_host = _origin_host(request.host_url)
118 trusted_extra = os.environ.get('HARTOS_TRUSTED_ORIGINS', '')
119 extra_hosts = set()
120 for entry in trusted_extra.split(','):
121 entry = entry.strip()
122 if entry:
123 host = _origin_host(entry)
124 if host:
125 extra_hosts.add(host)
127 def _host_allowed(host: str) -> bool:
128 if not host:
129 return False
130 if host in _LOOPBACK_HOSTS:
131 return True
132 if own_host and host == own_host:
133 return True
134 if host in extra_hosts:
135 return True
136 return False
138 # Origin takes priority — when present, it's the authoritative
139 # signal (browsers send a literal "null" string for opaque origins
140 # like file://; that fails _host_allowed correctly).
141 if origin_raw:
142 return _host_allowed(_origin_host(origin_raw))
143 # Fall through: Referer-only path.
144 return _host_allowed(_origin_host(referer_raw))
147def require_local_or_token(f):
148 """Allow localhost callers; require Bearer token for remote callers.
150 Returns 401 with a clear message when neither condition holds — the
151 error body is JSON to match the rest of the HARTOS API surface so
152 the React SPA can surface it via its existing error toast pipeline.
153 """
154 @wraps(f)
155 def decorated(*args, **kwargs):
156 if _is_local_request():
157 return f(*args, **kwargs)
158 if API_TOKEN:
159 auth = request.headers.get('Authorization', '')
160 if auth.startswith('Bearer '):
161 token = auth[7:]
162 # hmac.compare_digest is constant-time — defends against
163 # timing-oracle leaks on the token comparison.
164 if hmac.compare_digest(token, API_TOKEN):
165 return f(*args, **kwargs)
166 return jsonify({
167 'error': 'unauthorized',
168 'message': ('This endpoint requires local access or a '
169 'valid HARTOS_API_TOKEN bearer header.'),
170 }), 401
171 return decorated
174def require_local_or_token_csrf_safe(f):
175 """Same gate as ``require_local_or_token`` PLUS an Origin/Referer
176 header check that rejects cross-origin browser POSTs targeting
177 localhost.
179 Use this on DESTRUCTIVE state-changing endpoints (vlm_stop,
180 config writes, anything that bulk-mutates server state) — the
181 extra check costs one header lookup and closes the same-machine
182 browser CSRF gap.
184 Read-only / non-destructive endpoints SHOULD keep
185 ``require_local_or_token`` to avoid breaking the curl/native
186 desktop UX flows that don't send Origin headers.
188 Authenticated callers (Bearer token) bypass the CSRF check —
189 they've already proven possession of the shared secret, which a
190 browser CSRF attacker can't replay. This preserves the remote
191 ops + inter-node admin path.
192 """
193 @wraps(f)
194 def decorated(*args, **kwargs):
195 # Authenticated bearer-token callers skip the CSRF gate —
196 # token possession is itself proof the caller isn't a
197 # cross-origin browser context.
198 if API_TOKEN:
199 auth = request.headers.get('Authorization', '')
200 if auth.startswith('Bearer '):
201 token = auth[7:]
202 if hmac.compare_digest(token, API_TOKEN):
203 return f(*args, **kwargs)
204 # Local callers must additionally pass the CSRF check.
205 if _is_local_request():
206 if _is_safe_csrf_origin():
207 return f(*args, **kwargs)
208 return jsonify({
209 'error': 'forbidden',
210 'message': ('Cross-origin browser request rejected. '
211 'This endpoint requires same-origin POST or '
212 'a HARTOS_API_TOKEN bearer header.'),
213 }), 403
214 return jsonify({
215 'error': 'unauthorized',
216 'message': ('This endpoint requires local access or a '
217 'valid HARTOS_API_TOKEN bearer header.'),
218 }), 401
219 return decorated