Coverage for integrations / social / auth.py: 70.8%

226 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Authentication 

3JWT token generation, password hashing, and @require_auth decorator. 

4Uses security.jwt_manager for hardened token management. 

5""" 

6import hashlib 

7import hmac 

8import os 

9import sys 

10import stat 

11import secrets 

12import logging 

13import time 

14from functools import wraps 

15 

16from flask import request, g, jsonify 

17 

18try: 

19 import jwt as pyjwt 

20 HAS_JWT = True 

21except ImportError: 

22 HAS_JWT = False 

23 

24logger = logging.getLogger('hevolve_social') 

25 

26# Use hardened JWT manager when available 

27_jwt_manager = None 

28 

29def _get_jwt_manager(): 

30 global _jwt_manager 

31 if _jwt_manager is not None: 

32 return _jwt_manager 

33 if _jwt_manager is False: 

34 return None # already tried and failed — don't log again 

35 try: 

36 from security.jwt_manager import JWTManager 

37 _jwt_manager = JWTManager() 

38 logger.info("Using hardened JWTManager") 

39 return _jwt_manager 

40 except Exception as e: 

41 logger.warning(f"JWTManager unavailable ({e}), using legacy JWT") 

42 _jwt_manager = False # sentinel: don't retry 

43 return None 

44 

45# Legacy fallback values - fail closed if not configured 

46SECRET_KEY = os.environ.get('SOCIAL_SECRET_KEY', '') 

47if not SECRET_KEY: 

48 # Auto-generate and persist so tokens survive restarts. 

49 # Stored next to the database file (writable user dir). 

50 def _load_or_create_secret_key(): 

51 db_path = os.environ.get('HEVOLVE_DB_PATH', '') 

52 if db_path and db_path != ':memory:' and os.path.isabs(db_path): 

53 key_file = os.path.join(os.path.dirname(db_path), '.social_secret_key') 

54 elif os.environ.get('NUNBA_BUNDLED') or getattr(sys, 'frozen', False): 

55 try: 

56 from core.platform_paths import get_db_dir 

57 key_file = os.path.join(get_db_dir(), '.social_secret_key') 

58 except ImportError: 

59 key_file = os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', '.social_secret_key') 

60 else: 

61 key_file = os.path.join('agent_data', '.social_secret_key') 

62 try: 

63 if os.path.exists(key_file): 

64 with open(key_file, 'r') as f: 

65 key = f.read().strip() 

66 if len(key) >= 32: 

67 return key 

68 # Generate new key and persist 

69 key = secrets.token_hex(32) 

70 os.makedirs(os.path.dirname(key_file), exist_ok=True) 

71 with open(key_file, 'w') as f: 

72 f.write(key) 

73 # Restrict file permissions to owner read/write only (600) 

74 try: 

75 os.chmod(key_file, stat.S_IRUSR | stat.S_IWUSR) 

76 except (OSError, NotImplementedError): 

77 pass # Windows doesn't support POSIX chmod the same way 

78 logger.info(f"Generated persistent secret key at {key_file}") 

79 return key 

80 except (PermissionError, OSError) as e: 

81 logger.warning(f"Cannot persist secret key ({e}), using ephemeral") 

82 return secrets.token_hex(32) 

83 SECRET_KEY = _load_or_create_secret_key() 

84 if not os.environ.get('SOCIAL_SECRET_KEY'): 

85 logger.info("SOCIAL_SECRET_KEY not set — using auto-generated persistent key") 

86 

87TOKEN_EXPIRY = 30 * 24 * 3600 # 30 days — desktop app needs long-lived sessions 

88 

89 

90PBKDF2_ITERATIONS = 600_000 # OWASP 2023 minimum for PBKDF2-SHA256 

91 

92 

93def hash_password(password: str) -> str: 

94 salt = secrets.token_hex(16) 

95 hashed = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), PBKDF2_ITERATIONS) 

96 return f"{salt}:{hashed.hex()}" 

97 

98 

99def verify_password(password: str, stored: str) -> bool: 

100 if not stored or ':' not in stored: 

101 return False 

102 salt, hashed = stored.split(':', 1) 

103 # Support both old (100K) and new (600K) iteration counts 

104 for iterations in (PBKDF2_ITERATIONS, 100_000): 

105 check = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), iterations) 

106 if hmac.compare_digest(check.hex(), hashed): 

107 return True 

108 return False 

109 

110 

111def generate_api_token() -> str: 

112 return secrets.token_urlsafe(64) 

113 

114 

115def generate_jwt(user_id: str, username: str, role: str = 'flat', 

116 tenant_id: str = None) -> str: 

117 """Generate a LOCAL-scoped JWT (Layer 1). Works offline, survives kill switch. 

118 

119 Optional `tenant_id` adds a 'tid' claim used by central-cloud 

120 deploys to scope all subsequent queries to that tenant. 

121 Flat / regional pass tenant_id=None — the claim is omitted and 

122 every downstream query treats the request as untenanted (NULL 

123 pass-through). See plan Part E.1. 

124 

125 SECURITY: When the hardened JWTManager is available, ALL JWTs 

126 are issued through it — including tenant-scoped tokens, which 

127 are the most security-sensitive scope. Earlier revisions of 

128 this function bypassed JWTManager when tenant_id was set; that 

129 bypass is removed because it inverted the threat model 

130 (cloud tokens getting weaker hardening than flat tokens). 

131 JWTManager.generate_access_token now accepts tenant_id directly 

132 (security/jwt_manager.py). 

133 """ 

134 mgr = _get_jwt_manager() 

135 if mgr: 

136 return mgr.generate_access_token(str(user_id), username, 

137 tenant_id=tenant_id) 

138 if HAS_JWT: 

139 import uuid 

140 payload = { 

141 'user_id': str(user_id), 

142 'username': username, 

143 'role': role or 'flat', 

144 'jti': str(uuid.uuid4()), 

145 'iat': int(time.time()), 

146 'exp': int(time.time()) + TOKEN_EXPIRY, 

147 'type': 'access', 

148 'scope': 'local', 

149 } 

150 if tenant_id: 

151 payload['tid'] = str(tenant_id) 

152 return pyjwt.encode(payload, SECRET_KEY, algorithm='HS256') 

153 return generate_api_token() 

154 

155 

156def generate_jwt_with_tenant(user_id: str, username: str, role: str, 

157 tenant_id: str) -> str: 

158 """Convenience wrapper for cloud signups — signs JWT with tid claim.""" 

159 return generate_jwt(user_id, username, role, tenant_id=tenant_id) 

160 

161 

162def generate_hive_jwt(user_id: str, username: str, role: str = 'flat') -> str: 

163 """Generate a HIVE-scoped JWT (Layer 2). Ed25519-signed for cross-node verification. 

164 

165 Dies with master key revocation (kill switch). Requires certificate chain. 

166 """ 

167 mgr = _get_jwt_manager() 

168 if mgr: 

169 return mgr.generate_hive_token(str(user_id), username) 

170 # Fallback: if JWTManager unavailable, produce a local token 

171 # (hive features degrade gracefully) 

172 logger.warning("JWTManager unavailable, falling back to local token for hive request") 

173 return generate_jwt(user_id, username, role) 

174 

175 

176def verify_hive_jwt(token: str, issuer_public_key_hex: str) -> dict: 

177 """Verify a HIVE-scoped JWT from another node using its Ed25519 public key. 

178 

179 Returns payload dict on success, empty dict on failure. 

180 """ 

181 mgr = _get_jwt_manager() 

182 if mgr: 

183 result = mgr.verify_hive_token(token, issuer_public_key_hex) 

184 return result or {} 

185 return {} 

186 

187 

188def generate_token_pair(user_id: str, username: str, role: str = 'flat') -> dict: 

189 """Generate access + refresh token pair.""" 

190 mgr = _get_jwt_manager() 

191 if mgr: 

192 return mgr.generate_token_pair(str(user_id), username) 

193 return { 

194 'access_token': generate_jwt(user_id, username, role), 

195 'refresh_token': generate_api_token(), 

196 'token_type': 'bearer', 

197 'expires_in': TOKEN_EXPIRY, 

198 } 

199 

200 

201def decode_jwt(token: str) -> dict: 

202 """Decode a JWT token (any scope). Backward compatible. 

203 

204 Returns payload dict with 'scope' key ('local' for pre-upgrade tokens 

205 without scope, or the actual scope value). Empty dict on failure. 

206 """ 

207 mgr = _get_jwt_manager() 

208 if mgr: 

209 result = mgr.decode_token(token, expected_type='access') 

210 if result: 

211 # Ensure scope is always present for callers 

212 result.setdefault('scope', 'local') 

213 return result or {} 

214 if HAS_JWT: 

215 try: 

216 payload = pyjwt.decode(token, SECRET_KEY, algorithms=['HS256']) 

217 if payload.get('type') not in ('access', None): 

218 return {} # Reject non-access tokens (e.g. refresh tokens) 

219 payload.setdefault('scope', 'local') 

220 return payload 

221 except (pyjwt.ExpiredSignatureError, pyjwt.InvalidTokenError): 

222 return {} 

223 return {} 

224 

225 

226def revoke_token(token: str): 

227 """Revoke a JWT token (add to blocklist).""" 

228 mgr = _get_jwt_manager() 

229 if mgr: 

230 mgr.revoke_token(token) 

231 

232 

233def _get_user_from_token(token: str): 

234 """Look up user by API token or JWT (local or hive scope). 

235 

236 For hive tokens from other nodes: the HS256 decode will fail (different 

237 secret), so we fall through to the API token lookup. Cross-node hive 

238 verification should use verify_hive_jwt() explicitly in the endpoint. 

239 

240 Sets Flask g.token_scope to 'local', 'hive', or 'api_token' for callers. 

241 Also sets g.token_tenant_id to the JWT 'tid' claim (None for legacy 

242 tokens or API-token auth — flat/regional pass-through). 

243 """ 

244 from .models import get_db, User 

245 

246 # Try JWT first (works for local tokens and hive tokens issued by THIS node) 

247 payload = decode_jwt(token) 

248 if payload and 'user_id' in payload: 

249 db = get_db() 

250 try: 

251 user = db.query(User).filter(User.id == payload['user_id']).first() 

252 if user and not user.is_banned: 

253 try: 

254 g.token_scope = payload.get('scope', 'local') 

255 g.token_node_id = payload.get('node_id', '') 

256 g.token_tenant_id = payload.get('tid') 

257 except RuntimeError: 

258 pass # Outside request context (testing) 

259 return user, db 

260 finally: 

261 pass # keep session open for request lifecycle 

262 return None, db 

263 

264 # Fall back to raw API token lookup 

265 db = get_db() 

266 user = db.query(User).filter(User.api_token == token).first() 

267 if user and not user.is_banned: 

268 try: 

269 g.token_scope = 'api_token' 

270 g.token_node_id = '' 

271 g.token_tenant_id = None # API tokens are pre-tenancy 

272 except RuntimeError: 

273 pass 

274 return user, db 

275 return None, db 

276 

277 

278def require_auth(f): 

279 """Decorator: requires valid Bearer token. Sets g.user and g.db. 

280 

281 Phase 7a: also sets g.tenant_id (from JWT 'tid' claim) and 

282 g.feature_flags. Cloud deployments enforce that 'tid' is present 

283 when HEVOLVE_CLOUD_MODE=true; flat/regional pass through with 

284 g.tenant_id=None which downstream code treats as untenanted 

285 (NULL match in tenant_id-scoped queries — see plan Part E.1). 

286 """ 

287 @wraps(f) 

288 def decorated(*args, **kwargs): 

289 auth_header = request.headers.get('Authorization', '') 

290 if not auth_header.startswith('Bearer '): 

291 return jsonify({'success': False, 'error': 'Missing or invalid Authorization header'}), 401 

292 

293 token = auth_header[7:] 

294 user, db = _get_user_from_token(token) 

295 if user is None: 

296 if db: 

297 db.close() 

298 return jsonify({'success': False, 'error': 'Invalid or expired token'}), 401 

299 

300 g.user = user 

301 g.user_id = str(user.id) 

302 g.db = db 

303 # Tenant + feature flags. _get_user_from_token sets 

304 # g.token_tenant_id; we promote it to g.tenant_id for 

305 # downstream code. In cloud mode, missing tid is fatal. 

306 tenant_id = getattr(g, 'token_tenant_id', None) 

307 if os.environ.get('HEVOLVE_CLOUD_MODE', '').lower() == 'true' and not tenant_id: 

308 db.close() 

309 return jsonify({ 

310 'success': False, 

311 'error': 'Tenant required (cloud mode)', 

312 }), 403 

313 g.tenant_id = tenant_id 

314 try: 

315 from .feature_flags import get_flags_for_tenant 

316 g.feature_flags = get_flags_for_tenant(db, tenant_id) 

317 except ImportError: 

318 g.feature_flags = {} 

319 try: 

320 result = f(*args, **kwargs) 

321 db.commit() 

322 return result 

323 except Exception as e: 

324 if db.is_active: 

325 db.rollback() 

326 raise 

327 finally: 

328 db.close() 

329 

330 return decorated 

331 

332 

333def optional_auth(f): 

334 """Decorator: attaches user if token present, but doesn't require it. 

335 

336 Phase 7a: sets g.tenant_id and g.feature_flags identically to 

337 require_auth, with the addition that anonymous (no-token) 

338 requests get tenant_id=None (untenanted) and flags from the 

339 public default set — used by listing/discovery routes that 

340 don't need a logged-in user but should still respect tenancy 

341 when a JWT happens to be presented. 

342 """ 

343 @wraps(f) 

344 def decorated(*args, **kwargs): 

345 auth_header = request.headers.get('Authorization', '') 

346 if auth_header.startswith('Bearer '): 

347 token = auth_header[7:] 

348 user, db = _get_user_from_token(token) 

349 g.user = user 

350 g.user_id = str(user.id) if user else None 

351 g.db = db 

352 else: 

353 from .models import get_db 

354 g.user = None 

355 g.user_id = None 

356 g.db = get_db() 

357 

358 g.tenant_id = getattr(g, 'token_tenant_id', None) 

359 try: 

360 from .feature_flags import get_flags_for_tenant 

361 g.feature_flags = get_flags_for_tenant(g.db, g.tenant_id) 

362 except ImportError: 

363 g.feature_flags = {} 

364 

365 try: 

366 result = f(*args, **kwargs) 

367 g.db.commit() 

368 return result 

369 except Exception as e: 

370 g.db.rollback() 

371 raise 

372 finally: 

373 g.db.close() 

374 

375 return decorated 

376 

377 

378def require_admin(f): 

379 """Decorator: requires central (cloud admin) role or is_admin flag.""" 

380 @wraps(f) 

381 @require_auth 

382 def decorated(*args, **kwargs): 

383 user_role = getattr(g.user, 'role', None) or 'flat' 

384 if not (g.user.is_admin or user_role in ('central',)): 

385 return jsonify({'success': False, 'error': 'Admin access required'}), 403 

386 return f(*args, **kwargs) 

387 return decorated 

388 

389 

390def require_moderator(f): 

391 """Decorator: requires regional/central role, or is_admin/is_moderator flag.""" 

392 @wraps(f) 

393 @require_auth 

394 def decorated(*args, **kwargs): 

395 user_role = getattr(g.user, 'role', None) or 'flat' 

396 if not (g.user.is_admin or g.user.is_moderator or user_role in ('regional', 'central')): 

397 return jsonify({'success': False, 'error': 'Moderator access required'}), 403 

398 return f(*args, **kwargs) 

399 return decorated 

400 

401 

402def require_central(f): 

403 """Decorator: requires central (cloud admin) role.""" 

404 @wraps(f) 

405 @require_auth 

406 def decorated(*args, **kwargs): 

407 user_role = getattr(g.user, 'role', None) or 'flat' 

408 if user_role != 'central' and not g.user.is_admin: 

409 return jsonify({'success': False, 'error': 'Central access required'}), 403 

410 return f(*args, **kwargs) 

411 return decorated 

412 

413 

414def require_regional(f): 

415 """Decorator: requires regional or central role.""" 

416 @wraps(f) 

417 @require_auth 

418 def decorated(*args, **kwargs): 

419 user_role = getattr(g.user, 'role', None) or 'flat' 

420 if user_role not in ('central', 'regional') and not (g.user.is_admin or g.user.is_moderator): 

421 return jsonify({'success': False, 'error': 'Regional access required'}), 403 

422 return f(*args, **kwargs) 

423 return decorated