Coverage for security / middleware.py: 96.5%

144 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Security Middleware for Flask 

3Applies security headers, CORS, CSRF protection, host validation, and API auth. 

4""" 

5 

6import os 

7import logging 

8from functools import wraps 

9from flask import Flask, request, jsonify, g 

10 

11logger = logging.getLogger('hevolve_security') 

12 

13 

14def apply_security_middleware(app: Flask): 

15 """Apply all security middleware to a Flask app.""" 

16 

17 _apply_security_headers(app) 

18 _apply_cors(app) 

19 _apply_csrf_protection(app) 

20 _apply_host_validation(app) 

21 _apply_api_auth(app) 

22 

23 

24def _apply_security_headers(app: Flask): 

25 """Add security headers to all responses.""" 

26 

27 @app.after_request 

28 def add_security_headers(response): 

29 response.headers['X-Frame-Options'] = 'DENY' 

30 response.headers['X-Content-Type-Options'] = 'nosniff' 

31 response.headers['X-XSS-Protection'] = '1; mode=block' 

32 response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' 

33 response.headers['Permissions-Policy'] = ( 

34 'camera=(), microphone=(), geolocation=(), ' 

35 'payment=(), usb=(), magnetometer=()' 

36 ) 

37 

38 # HSTS only in production 

39 if os.environ.get('HEVOLVE_ENV') != 'development': 

40 response.headers['Strict-Transport-Security'] = ( 

41 'max-age=31536000; includeSubDomains; preload' 

42 ) 

43 response.headers['Content-Security-Policy'] = ( 

44 "default-src 'self'; " 

45 "script-src 'self'; " 

46 "style-src 'self'; " 

47 "img-src 'self' data:; " 

48 "connect-src 'self'; " 

49 "frame-ancestors 'none'" 

50 ) 

51 

52 return response 

53 

54 

55def _apply_cors(app: Flask): 

56 """CORS with explicit origin allowlist. 

57 

58 If CORS_ORIGINS is not set, no origins are allowed (fail-closed). 

59 Set CORS_ORIGINS=* for development only. 

60 """ 

61 raw_origins = os.environ.get('CORS_ORIGINS', '') 

62 allowed_origins = set( 

63 o.strip() for o in raw_origins.split(',') 

64 if o.strip() 

65 ) 

66 if not allowed_origins: 

67 logger.warning( 

68 "CORS_ORIGINS not configured - no cross-origin requests allowed. " 

69 "Set CORS_ORIGINS env var for production (comma-separated origins).") 

70 

71 @app.after_request 

72 def add_cors_headers(response): 

73 origin = request.headers.get('Origin', '') 

74 

75 if origin in allowed_origins: 

76 response.headers['Access-Control-Allow-Origin'] = origin 

77 response.headers['Access-Control-Allow-Methods'] = ( 

78 'GET, POST, PUT, DELETE, PATCH, OPTIONS' 

79 ) 

80 response.headers['Access-Control-Allow-Headers'] = ( 

81 'Content-Type, Authorization, X-API-Key, X-CSRF-Token' 

82 ) 

83 response.headers['Access-Control-Allow-Credentials'] = 'true' 

84 response.headers['Access-Control-Max-Age'] = '600' 

85 

86 return response 

87 

88 @app.before_request 

89 def handle_preflight(): 

90 if request.method == 'OPTIONS': 

91 response = app.make_default_options_response() 

92 origin = request.headers.get('Origin', '') 

93 if origin in allowed_origins: 

94 response.headers['Access-Control-Allow-Origin'] = origin 

95 response.headers['Access-Control-Allow-Methods'] = ( 

96 'GET, POST, PUT, DELETE, PATCH, OPTIONS' 

97 ) 

98 response.headers['Access-Control-Allow-Headers'] = ( 

99 'Content-Type, Authorization, X-API-Key, X-CSRF-Token' 

100 ) 

101 return response 

102 

103 

104def _apply_csrf_protection(app: Flask): 

105 """CSRF protection for state-changing requests.""" 

106 

107 # Paths exempt from CSRF (API-only endpoints using Bearer auth) 

108 CSRF_EXEMPT_PREFIXES = ( 

109 '/a2a/', '/api/social/bots/', '/status', 

110 '/.well-known/', 

111 ) 

112 

113 @app.before_request 

114 def csrf_check(): 

115 if request.method not in ('POST', 'PUT', 'DELETE', 'PATCH'): 

116 return 

117 

118 # Skip for exempt paths 

119 if any(request.path.startswith(p) for p in CSRF_EXEMPT_PREFIXES): 

120 return 

121 

122 # Bearer token auth is inherently CSRF-safe 

123 auth_header = request.headers.get('Authorization', '') 

124 if auth_header.startswith('Bearer '): 

125 return 

126 

127 # API key auth is also CSRF-safe 

128 if request.headers.get('X-API-Key'): 

129 return 

130 

131 # JSON content type with Origin check provides CSRF protection 

132 if request.content_type and 'application/json' in request.content_type: 

133 return 

134 

135 # For non-API requests (forms), require CSRF token 

136 csrf_token = request.headers.get('X-CSRF-Token') 

137 if not csrf_token: 

138 logger.warning(f"CSRF token missing for {request.method} {request.path}") 

139 return jsonify({'error': 'CSRF token required'}), 403 

140 

141 

142def _apply_host_validation(app: Flask): 

143 """Prevent Host header injection.""" 

144 

145 allowed_hosts = set( 

146 h.strip() for h in 

147 os.environ.get('ALLOWED_HOSTS', 'localhost,127.0.0.1').split(',') 

148 if h.strip() 

149 ) 

150 

151 @app.before_request 

152 def validate_host(): 

153 if os.environ.get('HEVOLVE_ENV') == 'development': 

154 return 

155 if os.environ.get('NUNBA_BUNDLED'): 

156 return 

157 

158 host = request.host.split(':')[0] 

159 if host not in allowed_hosts: 

160 logger.warning(f"Rejected request with invalid Host: {host}") 

161 return jsonify({'error': 'Invalid host'}), 400 

162 

163 

164#: Admin operations that modify persistent state. ALWAYS require auth 

165#: regardless of tier — even on a trusted LAN, random devices (IoT, 

166#: guests, kids' tablets) shouldn't be able to hit admin routes. The 

167#: only exception is bundled desktop mode (NUNBA_BUNDLED), which is 

168#: single-user in-process test_client territory with no network exposure. 

169#: 

170#: Each entry is a prefix — anything starting with it is considered an 

171#: admin path. Add new admin routes to this tuple; they inherit the 

172#: auth gate automatically. 

173ADMIN_PATHS = ('/api/admin',) 

174 

175#: User-facing API endpoints. These are guarded only when the deployment 

176#: is publicly exposed (central tier). Regional deployments live on a 

177#: trusted LAN or behind a gateway (KONG, etc.) that handles auth; flat 

178#: deployments are single-user desktop and pre-trusted. 

179NETWORK_PROTECTED_PATHS = ('/chat', '/time_agent', '/visual_agent', 

180 '/add_history', '/prompts', '/zeroshot', 

181 '/response_ack') 

182 

183#: Legacy alias — some tests still import PROTECTED_PATHS expecting 

184#: the combined tuple. Keep this as the union so older imports don't 

185#: break, while the split above drives the new enforcement logic. 

186PROTECTED_PATHS = ADMIN_PATHS + NETWORK_PROTECTED_PATHS 

187 

188EXEMPT_PREFIXES = ('/status', '/a2a/', '/api/social/', '/.well-known/', 

189 '/prompts/public') 

190 

191 

192def _apply_api_auth(app: Flask): 

193 """Tier-aware API authentication with a strict admin guard. 

194 

195 Two gates run in order: 

196 

197 1. ADMIN guard — /api/admin/* ALWAYS requires auth on any tier 

198 except bundled desktop. Admin ops modify persistent state so 

199 LAN trust is not enough — a compromised IoT device on the 

200 same network must not be able to drop agents or reconfigure 

201 TTS engines. 

202 2. NETWORK guard — /chat, /prompts, /visual_agent, etc. are 

203 guarded only on central tier (publicly exposed). Regional 

204 and flat tiers assume LAN trust or gateway-handled auth. 

205 

206 When HEVOLVE_API_KEY is set, BOTH gates accept X-API-Key. When it 

207 is unset, BOTH gates accept only a Bearer JWT. Exempt prefixes 

208 (/status, /a2a/, /api/social/, /.well-known/, /prompts/public) 

209 bypass both gates so health probes and social-media-facing routes 

210 stay public. 

211 

212 Deployment scenarios: 

213 - Behind KONG: KONG handles auth → no key needed, 

214 middleware enforces tier-conditional 

215 only if KONG is bypassed 

216 - Bundled desktop (NUNBA_BUNDLED): early return, always trusted 

217 - Regional LAN: /chat open, /api/admin gated 

218 - Central cloud: everything gated 

219 """ 

220 

221 def _path_matches_any(path: str, prefixes: tuple) -> bool: 

222 return any(path == p or path.startswith(p + '/') for p in prefixes) 

223 

224 def _is_exempt(path: str) -> bool: 

225 return any(path.startswith(p) for p in EXEMPT_PREFIXES) 

226 

227 def _is_admin_path(path: str) -> bool: 

228 return _path_matches_any(path, ADMIN_PATHS) 

229 

230 def _is_network_protected(path: str) -> bool: 

231 return _path_matches_any(path, NETWORK_PROTECTED_PATHS) 

232 

233 def _require_api_key_or_bearer(expected_key: str): 

234 """Return None if the request carries a valid credential, else 

235 a 401 jsonify response. Preference order matches the original 

236 middleware: X-API-Key if configured, otherwise Bearer token.""" 

237 if expected_key: 

238 api_key = request.headers.get('X-API-Key') 

239 if api_key and _constant_time_compare(api_key, expected_key): 

240 return None 

241 # Fall through to Bearer check so API-key-configured deploys 

242 # still accept JWTs (useful for admin UI + k8s probes). 

243 auth_header = request.headers.get('Authorization', '') 

244 if auth_header.startswith('Bearer '): 

245 # Actually decode and verify the JWT — not just check the prefix. 

246 # Without this, `Bearer garbage` passes the admin gate. 

247 _token = auth_header[7:] 

248 try: 

249 from integrations.social.auth import decode_jwt 

250 jwt_payload = decode_jwt(_token) 

251 if jwt_payload: 

252 g.auth_source = 'jwt' 

253 g.jwt_payload = jwt_payload 

254 return None 

255 except Exception: 

256 pass 

257 # Token invalid/expired — reject 

258 return jsonify({'error': 'Invalid or expired Bearer token'}), 401 

259 if expected_key: 

260 logger.warning(f"Invalid/missing credential for {request.path}") 

261 return jsonify( 

262 {'error': 'X-API-Key header or Bearer token required'}, 

263 ), 401 

264 return jsonify( 

265 {'error': 'Authentication required (Bearer token)'}, 

266 ), 401 

267 

268 @app.before_request 

269 def check_api_auth(): 

270 # Bundled/desktop mode: in-process test_client, always trusted. 

271 if os.environ.get('NUNBA_BUNDLED'): 

272 return 

273 

274 path = request.path 

275 if _is_exempt(path): 

276 return 

277 

278 # Resolve the shared credential once — both gates share it. 

279 try: 

280 from security.secrets_manager import get_secret 

281 expected_key = get_secret('HEVOLVE_API_KEY') 

282 except Exception: 

283 expected_key = os.environ.get('HEVOLVE_API_KEY', '') 

284 

285 # Gate 1: Admin paths. ALWAYS required. Even regional LAN 

286 # deployments gate admin ops — the tier model is for user-facing 

287 # APIs, not for operations that mutate persistent state. 

288 if _is_admin_path(path): 

289 resp = _require_api_key_or_bearer(expected_key) 

290 if resp is not None: 

291 return resp 

292 return # Admin path authenticated, skip gate 2 

293 

294 # Gate 2: User-facing API paths. Only enforced on central tier 

295 # (publicly exposed) OR whenever HEVOLVE_API_KEY is explicitly set 

296 # (direct-exposure deployments that opt in to the key layer). 

297 if not _is_network_protected(path): 

298 return # Not in either tuple → public 

299 

300 node_tier = os.environ.get('HEVOLVE_NODE_TIER', 'flat') 

301 if expected_key: 

302 resp = _require_api_key_or_bearer(expected_key) 

303 if resp is not None: 

304 return resp 

305 return 

306 if node_tier == 'central': 

307 resp = _require_api_key_or_bearer(expected_key='') 

308 if resp is not None: 

309 return resp 

310 return 

311 # Non-central without API key → LAN-trusted or gateway-auth'd 

312 return 

313 

314 

315def _constant_time_compare(a: str, b: str) -> bool: 

316 """Constant-time string comparison to prevent timing attacks.""" 

317 import hmac 

318 return hmac.compare_digest(a.encode(), b.encode())