Coverage for core / auth_local.py: 82.7%

75 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""Localhost-or-token decorator for HARTOS admin/control routes. 

2 

3Port of the canonical Nunba `routes/auth.py:require_local_or_token` 

4pattern into HARTOS, where multiple endpoints (vlm_stop, prompts/sync, 

5diag) need the same "trusted localhost OR valid Bearer token" semantic. 

6 

7Why this pattern instead of plain @require_auth: 

8 Nunba's bundled install runs HARTOS on 127.0.0.1:5000 and has the 

9 desktop tray (Tk indicator window, Python `app.py` / `main.py`) 

10 POST to /api/vlm/stop directly — without a logged-in JWT context. 

11 Adding plain @require_auth would break that user flow on every 

12 Stop-button click. This decorator preserves the local-trust UX 

13 while still rejecting remote unauthenticated callers. 

14 

15Threat model coverage: 

16 ✓ Remote attacker on the LAN — rejected (remote_addr != localhost) 

17 ✓ Remote attacker via DNS rebind — rejected (post-rebind remote_addr 

18 is still the attacker's IP, not localhost) 

19 ✓ Browser CSRF from same-origin localhost page — accepted (correct; 

20 that's the intended Nunba SPA flow) 

21 ✓ Browser CSRF from cross-origin page targeting localhost — REJECTED 

22 when the destructive endpoint uses ``@require_local_or_token_csrf_safe`` 

23 (Phase 9.5 hardening). The Origin/Referer header check distinguishes 

24 a same-origin SPA POST from a cross-origin attack page POST that 

25 happens to land on remote_addr=127.0.0.1. 

26 

27Env vars: 

28 HARTOS_API_TOKEN — optional shared secret. When set, callers may 

29 send `Authorization: Bearer <token>` to bypass the localhost check. 

30 Used by remote ops tooling and inter-node admin calls. 

31 

32 TRUSTED_PROXY — when HARTOS sits behind a reverse proxy (nginx, 

33 Traefik), all requests appear as remote_addr=127.0.0.1 by default. 

34 Setting this env to the proxy's address makes the decorator inspect 

35 X-Forwarded-For instead. Without it, only direct-connection 

36 remote_addr is trusted (safe default). 

37 

38 HARTOS_TRUSTED_ORIGINS — comma-separated list of origins that are 

39 additionally treated as same-origin for the CSRF check (e.g. 

40 "https://nunba.local,https://hevolve.ai"). Loopback origins 

41 (http://localhost, http://127.0.0.1, http://[::1]) are always 

42 accepted regardless. 

43""" 

44from __future__ import annotations 

45 

46import hmac 

47import os 

48from functools import wraps 

49from urllib.parse import urlparse 

50 

51from flask import jsonify, request 

52 

53# Read once at import time (not per-request) so token rotation requires 

54# a HARTOS restart — same model as Nunba. 

55API_TOKEN = os.environ.get('HARTOS_API_TOKEN', '') 

56 

57 

58def _is_local_request() -> bool: 

59 """True if the request is from localhost, honouring TRUSTED_PROXY.""" 

60 trusted_proxy = os.environ.get('TRUSTED_PROXY', '') 

61 if trusted_proxy and request.remote_addr == trusted_proxy: 

62 forwarded_for = (request.headers.get('X-Forwarded-For', '') 

63 .split(',')[0].strip()) 

64 return forwarded_for in ('127.0.0.1', '::1', 'localhost') 

65 return request.remote_addr in ('127.0.0.1', '::1') 

66 

67 

68# ── CSRF defense-in-depth (Phase 9.5) ────────────────────────────── 

69 

70 

71_LOOPBACK_HOSTS = ('localhost', '127.0.0.1', '::1', '[::1]') 

72 

73 

74def _origin_host(origin_value: str) -> str: 

75 """Parse an Origin/Referer URL and return just the lowercased host 

76 (without port). Returns '' on malformed input.""" 

77 if not origin_value: 

78 return '' 

79 try: 

80 parsed = urlparse(origin_value) 

81 return (parsed.hostname or '').lower() 

82 except Exception: 

83 return '' 

84 

85 

86def _is_safe_csrf_origin() -> bool: 

87 """Return True iff the request's Origin/Referer header matches the 

88 set of trusted same-origin sources for state-changing destructive 

89 endpoints. 

90 

91 Decision rules: 

92 1. Both Origin AND Referer absent → ACCEPT (non-browser client; 

93 curl, native desktop, Python requests). Browser-driven CSRF 

94 attacks always send at least Origin or Referer — they cannot 

95 be suppressed by an attacker page. 

96 2. If Origin is present, its host MUST be loopback OR the 

97 request's own Host OR a HARTOS_TRUSTED_ORIGINS entry. 

98 3. If only Referer is present (older browsers / some Electron 

99 paths), apply the same host check to its hostname. 

100 

101 This closes the same-machine cross-origin browser CSRF gap noted 

102 in the module docstring. Wrapping a route with 

103 ``@require_local_or_token_csrf_safe`` activates this gate; routes 

104 that keep the original ``@require_local_or_token`` are unchanged. 

105 """ 

106 origin_raw = request.headers.get('Origin', '').strip() 

107 referer_raw = request.headers.get('Referer', '').strip() 

108 if not origin_raw and not referer_raw: 

109 # Browsers always send at least Origin on cross-origin POST 

110 # (the spec requires it). Absence means the request came from 

111 # a non-browser client — curl, native desktop, server-to-server. 

112 # require_local_or_token has already established it's localhost 

113 # or an authenticated token holder. 

114 return True 

115 

116 # Build the allowed host set. 

117 own_host = _origin_host(request.host_url) 

118 trusted_extra = os.environ.get('HARTOS_TRUSTED_ORIGINS', '') 

119 extra_hosts = set() 

120 for entry in trusted_extra.split(','): 

121 entry = entry.strip() 

122 if entry: 

123 host = _origin_host(entry) 

124 if host: 

125 extra_hosts.add(host) 

126 

127 def _host_allowed(host: str) -> bool: 

128 if not host: 

129 return False 

130 if host in _LOOPBACK_HOSTS: 

131 return True 

132 if own_host and host == own_host: 

133 return True 

134 if host in extra_hosts: 

135 return True 

136 return False 

137 

138 # Origin takes priority — when present, it's the authoritative 

139 # signal (browsers send a literal "null" string for opaque origins 

140 # like file://; that fails _host_allowed correctly). 

141 if origin_raw: 

142 return _host_allowed(_origin_host(origin_raw)) 

143 # Fall through: Referer-only path. 

144 return _host_allowed(_origin_host(referer_raw)) 

145 

146 

147def require_local_or_token(f): 

148 """Allow localhost callers; require Bearer token for remote callers. 

149 

150 Returns 401 with a clear message when neither condition holds — the 

151 error body is JSON to match the rest of the HARTOS API surface so 

152 the React SPA can surface it via its existing error toast pipeline. 

153 """ 

154 @wraps(f) 

155 def decorated(*args, **kwargs): 

156 if _is_local_request(): 

157 return f(*args, **kwargs) 

158 if API_TOKEN: 

159 auth = request.headers.get('Authorization', '') 

160 if auth.startswith('Bearer '): 

161 token = auth[7:] 

162 # hmac.compare_digest is constant-time — defends against 

163 # timing-oracle leaks on the token comparison. 

164 if hmac.compare_digest(token, API_TOKEN): 

165 return f(*args, **kwargs) 

166 return jsonify({ 

167 'error': 'unauthorized', 

168 'message': ('This endpoint requires local access or a ' 

169 'valid HARTOS_API_TOKEN bearer header.'), 

170 }), 401 

171 return decorated 

172 

173 

174def require_local_or_token_csrf_safe(f): 

175 """Same gate as ``require_local_or_token`` PLUS an Origin/Referer 

176 header check that rejects cross-origin browser POSTs targeting 

177 localhost. 

178 

179 Use this on DESTRUCTIVE state-changing endpoints (vlm_stop, 

180 config writes, anything that bulk-mutates server state) — the 

181 extra check costs one header lookup and closes the same-machine 

182 browser CSRF gap. 

183 

184 Read-only / non-destructive endpoints SHOULD keep 

185 ``require_local_or_token`` to avoid breaking the curl/native 

186 desktop UX flows that don't send Origin headers. 

187 

188 Authenticated callers (Bearer token) bypass the CSRF check — 

189 they've already proven possession of the shared secret, which a 

190 browser CSRF attacker can't replay. This preserves the remote 

191 ops + inter-node admin path. 

192 """ 

193 @wraps(f) 

194 def decorated(*args, **kwargs): 

195 # Authenticated bearer-token callers skip the CSRF gate — 

196 # token possession is itself proof the caller isn't a 

197 # cross-origin browser context. 

198 if API_TOKEN: 

199 auth = request.headers.get('Authorization', '') 

200 if auth.startswith('Bearer '): 

201 token = auth[7:] 

202 if hmac.compare_digest(token, API_TOKEN): 

203 return f(*args, **kwargs) 

204 # Local callers must additionally pass the CSRF check. 

205 if _is_local_request(): 

206 if _is_safe_csrf_origin(): 

207 return f(*args, **kwargs) 

208 return jsonify({ 

209 'error': 'forbidden', 

210 'message': ('Cross-origin browser request rejected. ' 

211 'This endpoint requires same-origin POST or ' 

212 'a HARTOS_API_TOKEN bearer header.'), 

213 }), 403 

214 return jsonify({ 

215 'error': 'unauthorized', 

216 'message': ('This endpoint requires local access or a ' 

217 'valid HARTOS_API_TOKEN bearer header.'), 

218 }), 401 

219 return decorated