Coverage for security / middleware.py: 96.5%
144 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Security Middleware for Flask
3Applies security headers, CORS, CSRF protection, host validation, and API auth.
4"""
6import os
7import logging
8from functools import wraps
9from flask import Flask, request, jsonify, g
11logger = logging.getLogger('hevolve_security')
14def apply_security_middleware(app: Flask):
15 """Apply all security middleware to a Flask app."""
17 _apply_security_headers(app)
18 _apply_cors(app)
19 _apply_csrf_protection(app)
20 _apply_host_validation(app)
21 _apply_api_auth(app)
24def _apply_security_headers(app: Flask):
25 """Add security headers to all responses."""
27 @app.after_request
28 def add_security_headers(response):
29 response.headers['X-Frame-Options'] = 'DENY'
30 response.headers['X-Content-Type-Options'] = 'nosniff'
31 response.headers['X-XSS-Protection'] = '1; mode=block'
32 response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
33 response.headers['Permissions-Policy'] = (
34 'camera=(), microphone=(), geolocation=(), '
35 'payment=(), usb=(), magnetometer=()'
36 )
38 # HSTS only in production
39 if os.environ.get('HEVOLVE_ENV') != 'development':
40 response.headers['Strict-Transport-Security'] = (
41 'max-age=31536000; includeSubDomains; preload'
42 )
43 response.headers['Content-Security-Policy'] = (
44 "default-src 'self'; "
45 "script-src 'self'; "
46 "style-src 'self'; "
47 "img-src 'self' data:; "
48 "connect-src 'self'; "
49 "frame-ancestors 'none'"
50 )
52 return response
55def _apply_cors(app: Flask):
56 """CORS with explicit origin allowlist.
58 If CORS_ORIGINS is not set, no origins are allowed (fail-closed).
59 Set CORS_ORIGINS=* for development only.
60 """
61 raw_origins = os.environ.get('CORS_ORIGINS', '')
62 allowed_origins = set(
63 o.strip() for o in raw_origins.split(',')
64 if o.strip()
65 )
66 if not allowed_origins:
67 logger.warning(
68 "CORS_ORIGINS not configured - no cross-origin requests allowed. "
69 "Set CORS_ORIGINS env var for production (comma-separated origins).")
71 @app.after_request
72 def add_cors_headers(response):
73 origin = request.headers.get('Origin', '')
75 if origin in allowed_origins:
76 response.headers['Access-Control-Allow-Origin'] = origin
77 response.headers['Access-Control-Allow-Methods'] = (
78 'GET, POST, PUT, DELETE, PATCH, OPTIONS'
79 )
80 response.headers['Access-Control-Allow-Headers'] = (
81 'Content-Type, Authorization, X-API-Key, X-CSRF-Token'
82 )
83 response.headers['Access-Control-Allow-Credentials'] = 'true'
84 response.headers['Access-Control-Max-Age'] = '600'
86 return response
88 @app.before_request
89 def handle_preflight():
90 if request.method == 'OPTIONS':
91 response = app.make_default_options_response()
92 origin = request.headers.get('Origin', '')
93 if origin in allowed_origins:
94 response.headers['Access-Control-Allow-Origin'] = origin
95 response.headers['Access-Control-Allow-Methods'] = (
96 'GET, POST, PUT, DELETE, PATCH, OPTIONS'
97 )
98 response.headers['Access-Control-Allow-Headers'] = (
99 'Content-Type, Authorization, X-API-Key, X-CSRF-Token'
100 )
101 return response
104def _apply_csrf_protection(app: Flask):
105 """CSRF protection for state-changing requests."""
107 # Paths exempt from CSRF (API-only endpoints using Bearer auth)
108 CSRF_EXEMPT_PREFIXES = (
109 '/a2a/', '/api/social/bots/', '/status',
110 '/.well-known/',
111 )
113 @app.before_request
114 def csrf_check():
115 if request.method not in ('POST', 'PUT', 'DELETE', 'PATCH'):
116 return
118 # Skip for exempt paths
119 if any(request.path.startswith(p) for p in CSRF_EXEMPT_PREFIXES):
120 return
122 # Bearer token auth is inherently CSRF-safe
123 auth_header = request.headers.get('Authorization', '')
124 if auth_header.startswith('Bearer '):
125 return
127 # API key auth is also CSRF-safe
128 if request.headers.get('X-API-Key'):
129 return
131 # JSON content type with Origin check provides CSRF protection
132 if request.content_type and 'application/json' in request.content_type:
133 return
135 # For non-API requests (forms), require CSRF token
136 csrf_token = request.headers.get('X-CSRF-Token')
137 if not csrf_token:
138 logger.warning(f"CSRF token missing for {request.method} {request.path}")
139 return jsonify({'error': 'CSRF token required'}), 403
142def _apply_host_validation(app: Flask):
143 """Prevent Host header injection."""
145 allowed_hosts = set(
146 h.strip() for h in
147 os.environ.get('ALLOWED_HOSTS', 'localhost,127.0.0.1').split(',')
148 if h.strip()
149 )
151 @app.before_request
152 def validate_host():
153 if os.environ.get('HEVOLVE_ENV') == 'development':
154 return
155 if os.environ.get('NUNBA_BUNDLED'):
156 return
158 host = request.host.split(':')[0]
159 if host not in allowed_hosts:
160 logger.warning(f"Rejected request with invalid Host: {host}")
161 return jsonify({'error': 'Invalid host'}), 400
164#: Admin operations that modify persistent state. ALWAYS require auth
165#: regardless of tier — even on a trusted LAN, random devices (IoT,
166#: guests, kids' tablets) shouldn't be able to hit admin routes. The
167#: only exception is bundled desktop mode (NUNBA_BUNDLED), which is
168#: single-user in-process test_client territory with no network exposure.
169#:
170#: Each entry is a prefix — anything starting with it is considered an
171#: admin path. Add new admin routes to this tuple; they inherit the
172#: auth gate automatically.
173ADMIN_PATHS = ('/api/admin',)
175#: User-facing API endpoints. These are guarded only when the deployment
176#: is publicly exposed (central tier). Regional deployments live on a
177#: trusted LAN or behind a gateway (KONG, etc.) that handles auth; flat
178#: deployments are single-user desktop and pre-trusted.
179NETWORK_PROTECTED_PATHS = ('/chat', '/time_agent', '/visual_agent',
180 '/add_history', '/prompts', '/zeroshot',
181 '/response_ack')
183#: Legacy alias — some tests still import PROTECTED_PATHS expecting
184#: the combined tuple. Keep this as the union so older imports don't
185#: break, while the split above drives the new enforcement logic.
186PROTECTED_PATHS = ADMIN_PATHS + NETWORK_PROTECTED_PATHS
188EXEMPT_PREFIXES = ('/status', '/a2a/', '/api/social/', '/.well-known/',
189 '/prompts/public')
192def _apply_api_auth(app: Flask):
193 """Tier-aware API authentication with a strict admin guard.
195 Two gates run in order:
197 1. ADMIN guard — /api/admin/* ALWAYS requires auth on any tier
198 except bundled desktop. Admin ops modify persistent state so
199 LAN trust is not enough — a compromised IoT device on the
200 same network must not be able to drop agents or reconfigure
201 TTS engines.
202 2. NETWORK guard — /chat, /prompts, /visual_agent, etc. are
203 guarded only on central tier (publicly exposed). Regional
204 and flat tiers assume LAN trust or gateway-handled auth.
206 When HEVOLVE_API_KEY is set, BOTH gates accept X-API-Key. When it
207 is unset, BOTH gates accept only a Bearer JWT. Exempt prefixes
208 (/status, /a2a/, /api/social/, /.well-known/, /prompts/public)
209 bypass both gates so health probes and social-media-facing routes
210 stay public.
212 Deployment scenarios:
213 - Behind KONG: KONG handles auth → no key needed,
214 middleware enforces tier-conditional
215 only if KONG is bypassed
216 - Bundled desktop (NUNBA_BUNDLED): early return, always trusted
217 - Regional LAN: /chat open, /api/admin gated
218 - Central cloud: everything gated
219 """
221 def _path_matches_any(path: str, prefixes: tuple) -> bool:
222 return any(path == p or path.startswith(p + '/') for p in prefixes)
224 def _is_exempt(path: str) -> bool:
225 return any(path.startswith(p) for p in EXEMPT_PREFIXES)
227 def _is_admin_path(path: str) -> bool:
228 return _path_matches_any(path, ADMIN_PATHS)
230 def _is_network_protected(path: str) -> bool:
231 return _path_matches_any(path, NETWORK_PROTECTED_PATHS)
233 def _require_api_key_or_bearer(expected_key: str):
234 """Return None if the request carries a valid credential, else
235 a 401 jsonify response. Preference order matches the original
236 middleware: X-API-Key if configured, otherwise Bearer token."""
237 if expected_key:
238 api_key = request.headers.get('X-API-Key')
239 if api_key and _constant_time_compare(api_key, expected_key):
240 return None
241 # Fall through to Bearer check so API-key-configured deploys
242 # still accept JWTs (useful for admin UI + k8s probes).
243 auth_header = request.headers.get('Authorization', '')
244 if auth_header.startswith('Bearer '):
245 # Actually decode and verify the JWT — not just check the prefix.
246 # Without this, `Bearer garbage` passes the admin gate.
247 _token = auth_header[7:]
248 try:
249 from integrations.social.auth import decode_jwt
250 jwt_payload = decode_jwt(_token)
251 if jwt_payload:
252 g.auth_source = 'jwt'
253 g.jwt_payload = jwt_payload
254 return None
255 except Exception:
256 pass
257 # Token invalid/expired — reject
258 return jsonify({'error': 'Invalid or expired Bearer token'}), 401
259 if expected_key:
260 logger.warning(f"Invalid/missing credential for {request.path}")
261 return jsonify(
262 {'error': 'X-API-Key header or Bearer token required'},
263 ), 401
264 return jsonify(
265 {'error': 'Authentication required (Bearer token)'},
266 ), 401
268 @app.before_request
269 def check_api_auth():
270 # Bundled/desktop mode: in-process test_client, always trusted.
271 if os.environ.get('NUNBA_BUNDLED'):
272 return
274 path = request.path
275 if _is_exempt(path):
276 return
278 # Resolve the shared credential once — both gates share it.
279 try:
280 from security.secrets_manager import get_secret
281 expected_key = get_secret('HEVOLVE_API_KEY')
282 except Exception:
283 expected_key = os.environ.get('HEVOLVE_API_KEY', '')
285 # Gate 1: Admin paths. ALWAYS required. Even regional LAN
286 # deployments gate admin ops — the tier model is for user-facing
287 # APIs, not for operations that mutate persistent state.
288 if _is_admin_path(path):
289 resp = _require_api_key_or_bearer(expected_key)
290 if resp is not None:
291 return resp
292 return # Admin path authenticated, skip gate 2
294 # Gate 2: User-facing API paths. Only enforced on central tier
295 # (publicly exposed) OR whenever HEVOLVE_API_KEY is explicitly set
296 # (direct-exposure deployments that opt in to the key layer).
297 if not _is_network_protected(path):
298 return # Not in either tuple → public
300 node_tier = os.environ.get('HEVOLVE_NODE_TIER', 'flat')
301 if expected_key:
302 resp = _require_api_key_or_bearer(expected_key)
303 if resp is not None:
304 return resp
305 return
306 if node_tier == 'central':
307 resp = _require_api_key_or_bearer(expected_key='')
308 if resp is not None:
309 return resp
310 return
311 # Non-central without API key → LAN-trusted or gateway-auth'd
312 return
315def _constant_time_compare(a: str, b: str) -> bool:
316 """Constant-time string comparison to prevent timing attacks."""
317 import hmac
318 return hmac.compare_digest(a.encode(), b.encode())