Coverage for security / jwt_manager.py: 81.9%
182 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Hardened JWT Manager — Two-Layer Authentication
4Layer 1 (LOCAL): Node's own HS256 secret signs local JWTs.
5 Works offline, survives kill switch, survives central outage.
6Layer 2 (HIVE): HS256 + Ed25519 node_sig for cross-node identity.
7 Certificate chain verified — killed by master key revocation.
9The master key NEVER appears in JWT signing, token verification, or local auth.
10The master key's SOLE purpose is the kill switch for the distributed intelligence.
11"""
13import json
14import os
15import time
16import uuid
17import logging
18import hashlib
19import hmac
20from typing import Optional, Tuple
21from functools import lru_cache
23logger = logging.getLogger('hevolve_security')
25try:
26 import jwt as pyjwt
27 HAS_JWT = True
28except ImportError:
29 HAS_JWT = False
31import sys as _sys
32# Desktop apps need long-lived sessions — users don't re-login daily.
33# Cloud/server deploys should use short-lived tokens via env override.
34_BUNDLED = getattr(_sys, 'frozen', False) or os.environ.get('NUNBA_BUNDLED')
35ACCESS_TOKEN_EXPIRY = int(os.environ.get(
36 'JWT_ACCESS_EXPIRY',
37 30 * 86400 if _BUNDLED else 3600)) # 30 days bundled, 1 hour server
38REFRESH_TOKEN_EXPIRY = int(os.environ.get(
39 'JWT_REFRESH_EXPIRY',
40 90 * 86400 if _BUNDLED else 86400 * 7)) # 90 days bundled, 7 days server
41GRACE_PERIOD_EXPIRY = 86400 * 30 # 30 days - accept old tokens during migration
44class TokenBlocklist:
45 """In-memory token blocklist with optional Redis backend."""
47 def __init__(self):
48 self._memory_blocklist: set = set()
49 self._redis = None
50 self._init_redis()
52 def _init_redis(self):
53 try:
54 import redis
55 redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
56 self._redis = redis.from_url(redis_url, decode_responses=True)
57 self._redis.ping()
58 except Exception:
59 self._redis = None
61 def add(self, jti: str, expires_in: int = ACCESS_TOKEN_EXPIRY):
62 """Add a token JTI to the blocklist."""
63 self._memory_blocklist.add(jti)
64 if self._redis:
65 try:
66 self._redis.setex(f"jwt_blocklist:{jti}", expires_in, "1")
67 except Exception:
68 pass
70 def is_blocked(self, jti: str) -> bool:
71 """Check if a token JTI is blocked."""
72 if jti in self._memory_blocklist:
73 return True
74 if self._redis:
75 try:
76 return self._redis.exists(f"jwt_blocklist:{jti}") > 0
77 except Exception:
78 pass
79 return False
82# Singleton blocklist
83_blocklist = TokenBlocklist()
86def _get_node_id() -> str:
87 """Return the first 16 hex chars of this node's Ed25519 public key."""
88 try:
89 from security.node_integrity import get_public_key_hex
90 return get_public_key_hex()[:16]
91 except Exception:
92 return 'unknown'
95class JWTManager:
96 """Secure JWT token management with two-layer auth.
98 Layer 1 — LOCAL tokens:
99 HS256 signed with per-node SOCIAL_SECRET_KEY.
100 scope='local', iss='node:{node_id}'.
101 Work offline. Survive kill switch.
103 Layer 2 — HIVE tokens:
104 HS256 + Ed25519 node_sig (in custom 'node_sig' claim).
105 scope='hive', iss='hive:hevolve'.
106 Verifiable by any node that knows the issuer's public key.
107 Dies with certificate chain revocation (kill switch).
108 """
110 def __init__(self, secret_key: Optional[str] = None):
111 self._secret_key = secret_key or self._load_secret_key()
112 self._validate_secret_key()
114 @staticmethod
115 def _load_secret_key() -> str:
116 key = os.environ.get('SOCIAL_SECRET_KEY', '')
117 if not key:
118 try:
119 from security.secrets_manager import get_secret
120 key = get_secret('SOCIAL_SECRET_KEY')
121 except Exception:
122 pass
123 if not key:
124 # Read from the persistent key file that auth.py creates.
125 # This ensures JWTManager and legacy auth use the SAME key,
126 # so tokens generated by either can be verified by either.
127 candidates = [
128 os.path.join(os.environ.get('HEVOLVE_DB_PATH', ''), '..', '.social_secret_key'),
129 os.path.join('agent_data', '.social_secret_key'),
130 ]
131 try:
132 from core.platform_paths import get_db_dir
133 candidates.insert(1, os.path.join(get_db_dir(), '.social_secret_key'))
134 except ImportError:
135 candidates.insert(1, os.path.join(
136 os.path.expanduser('~'), 'Documents', 'Nunba', 'data', '.social_secret_key'))
137 for candidate in candidates:
138 try:
139 candidate = os.path.normpath(candidate)
140 if os.path.exists(candidate):
141 with open(candidate, 'r') as f:
142 key = f.read().strip()
143 if len(key) >= 32:
144 break
145 key = ''
146 except Exception:
147 continue
148 return key
150 def _validate_secret_key(self):
151 """Reject weak/default secrets."""
152 weak_defaults = [
153 'hevolve-social-secret-change-in-production',
154 'secret', 'changeme', 'password', '',
155 ]
156 if self._secret_key in weak_defaults:
157 raise RuntimeError(
158 "SOCIAL_SECRET_KEY is weak or default. "
159 "Set a strong secret via environment variable."
160 )
161 if len(self._secret_key) < 32:
162 logger.warning("SOCIAL_SECRET_KEY is shorter than 32 characters. Consider using a longer key.")
164 def generate_access_token(self, user_id: str, username: str,
165 tenant_id: str = None) -> str:
166 """Generate a short-lived LOCAL access token (1 hour).
168 Optional `tenant_id` adds a 'tid' claim — used by central
169 cloud deploys for tenant isolation. The claim flows through
170 the same hardened signing pipeline as every other claim;
171 callers MUST NOT bypass JWTManager when adding tenancy.
172 """
173 return self._generate_token(
174 user_id, username, 'access', ACCESS_TOKEN_EXPIRY, scope='local',
175 tenant_id=tenant_id
176 )
178 def generate_refresh_token(self, user_id: str, username: str,
179 tenant_id: str = None) -> str:
180 """Generate a LOCAL refresh token (7 days)."""
181 return self._generate_token(
182 user_id, username, 'refresh', REFRESH_TOKEN_EXPIRY, scope='local',
183 tenant_id=tenant_id
184 )
186 def generate_token_pair(self, user_id: str, username: str,
187 tenant_id: str = None) -> dict:
188 """Generate both access and refresh tokens (local scope)."""
189 return {
190 'access_token': self.generate_access_token(user_id, username, tenant_id=tenant_id),
191 'refresh_token': self.generate_refresh_token(user_id, username, tenant_id=tenant_id),
192 'token_type': 'bearer',
193 'expires_in': ACCESS_TOKEN_EXPIRY,
194 'scope': 'local',
195 }
197 def _generate_token(self, user_id: str, username: str,
198 token_type: str, expiry: int,
199 scope: str = 'local',
200 tenant_id: str = None) -> str:
201 if not HAS_JWT:
202 # HMAC fallback cannot encode tenant_id (no JSON payload).
203 # In cloud mode this is a hard failure — refuse rather than
204 # silently issuing an untenanted token.
205 if tenant_id:
206 raise RuntimeError(
207 "Cloud mode requires PyJWT to issue tenant-scoped tokens; "
208 "HMAC fallback cannot carry the 'tid' claim safely.")
209 return self._generate_hmac_token(user_id, username)
211 node_id = _get_node_id()
212 payload = {
213 'user_id': user_id,
214 'username': username,
215 'jti': str(uuid.uuid4()),
216 'iat': int(time.time()),
217 'exp': int(time.time()) + expiry,
218 'type': token_type,
219 'scope': scope,
220 'node_id': node_id,
221 'iss': f'node:{node_id}' if scope == 'local' else 'hive:hevolve',
222 }
223 if tenant_id:
224 payload['tid'] = str(tenant_id)
225 return pyjwt.encode(payload, self._secret_key, algorithm='HS256')
227 def _generate_hmac_token(self, user_id: str, username: str) -> str:
228 """Fallback token generation without PyJWT."""
229 raw = f"{user_id}:{username}:{time.time()}:{uuid.uuid4()}"
230 return hmac.new(
231 self._secret_key.encode(), raw.encode(), hashlib.sha256
232 ).hexdigest()
234 def decode_token(self, token: str, expected_type: str = 'access') -> Optional[dict]:
235 """Decode and validate a JWT token (any scope).
237 Backward compatible: tokens without 'scope' are treated as local.
238 Returns payload dict or None if invalid.
239 """
240 if not HAS_JWT:
241 return None
243 try:
244 payload = pyjwt.decode(
245 token, self._secret_key, algorithms=['HS256']
246 )
248 # Check token type
249 token_type = payload.get('type', 'access')
250 if token_type != expected_type:
251 logger.warning(f"Token type mismatch: expected {expected_type}, got {token_type}")
252 return None
254 # Check blocklist
255 jti = payload.get('jti')
256 if jti and _blocklist.is_blocked(jti):
257 logger.warning(f"Blocked token used: jti={jti}")
258 return None
260 return payload
262 except pyjwt.ExpiredSignatureError:
263 return None
264 except pyjwt.InvalidTokenError as e:
265 logger.warning(f"Invalid token: {e}")
266 return None
268 def revoke_token(self, token: str):
269 """Revoke a token by adding its JTI to the blocklist."""
270 if not HAS_JWT:
271 return
273 try:
274 # Decode without verification to get JTI (token may already be expired)
275 payload = pyjwt.decode(
276 token, self._secret_key, algorithms=['HS256'],
277 options={'verify_exp': False}
278 )
279 jti = payload.get('jti')
280 if jti:
281 exp = payload.get('exp', 0)
282 ttl = max(exp - int(time.time()), ACCESS_TOKEN_EXPIRY)
283 _blocklist.add(jti, ttl)
284 logger.info(f"Token revoked: jti={jti}")
285 except Exception:
286 pass
288 def refresh_access_token(self, refresh_token: str) -> Optional[dict]:
289 """
290 Use a refresh token to generate a new access token.
291 Returns new token pair or None if refresh token is invalid.
292 """
293 payload = self.decode_token(refresh_token, expected_type='refresh')
294 if not payload:
295 return None
297 # Revoke old refresh token (rotation)
298 self.revoke_token(refresh_token)
300 return self.generate_token_pair(
301 payload['user_id'], payload['username']
302 )
304 # ── Layer 2: Hive tokens ────────────────────────────────────
306 def generate_hive_token(self, user_id: str, username: str) -> str:
307 """Generate a hive-scoped access token with Ed25519 node signature.
309 The JWT is HS256-signed (readable by this node) AND carries a
310 'node_sig' claim: the Ed25519 signature over the canonical payload
311 (minus node_sig itself). Any peer that knows our public key can
312 verify the node_sig without needing our HS256 secret.
313 """
314 if not HAS_JWT:
315 return self._generate_hmac_token(user_id, username)
317 node_id = _get_node_id()
318 payload = {
319 'user_id': user_id,
320 'username': username,
321 'jti': str(uuid.uuid4()),
322 'iat': int(time.time()),
323 'exp': int(time.time()) + ACCESS_TOKEN_EXPIRY,
324 'type': 'access',
325 'scope': 'hive',
326 'node_id': node_id,
327 'iss': 'hive:hevolve',
328 }
330 # Sign the payload with this node's Ed25519 key
331 try:
332 from security.node_integrity import sign_json_payload
333 payload['node_sig'] = sign_json_payload(payload)
334 except Exception as e:
335 logger.warning(f"Could not sign hive token with Ed25519: {e}")
336 # Still produce a valid JWT, just without cross-node verifiability
337 payload['node_sig'] = ''
339 return pyjwt.encode(payload, self._secret_key, algorithm='HS256')
341 def verify_hive_token(self, token: str,
342 issuer_public_key_hex: str) -> Optional[dict]:
343 """Verify a hive-scoped token from another node.
345 Two verification paths:
346 1. If we share the HS256 secret (same node or shared secret):
347 decode normally, then also verify node_sig.
348 2. If we only have the issuer's Ed25519 public key:
349 decode WITHOUT HS256 verification, then verify node_sig.
351 Returns the payload dict on success, None on failure.
352 """
353 if not HAS_JWT:
354 return None
356 payload = None
358 # Path 1: try HS256 decode (works if this is the issuing node)
359 try:
360 payload = pyjwt.decode(
361 token, self._secret_key, algorithms=['HS256']
362 )
363 except (pyjwt.InvalidTokenError, pyjwt.ExpiredSignatureError):
364 pass
366 # Path 2: decode without HS256 verification (cross-node)
367 if payload is None:
368 try:
369 payload = pyjwt.decode(
370 token, options={
371 'verify_signature': False,
372 'verify_exp': True,
373 }, algorithms=['HS256']
374 )
375 except (pyjwt.ExpiredSignatureError, pyjwt.InvalidTokenError):
376 return None
378 if not payload:
379 return None
381 # Must be a hive token
382 if payload.get('scope') != 'hive':
383 logger.warning("verify_hive_token called on non-hive token")
384 return None
386 # Check blocklist
387 jti = payload.get('jti')
388 if jti and _blocklist.is_blocked(jti):
389 logger.warning(f"Blocked hive token used: jti={jti}")
390 return None
392 # Verify Ed25519 node_sig
393 node_sig = payload.get('node_sig', '')
394 if not node_sig:
395 logger.warning("Hive token missing node_sig claim")
396 return None
398 try:
399 from security.node_integrity import verify_json_signature
400 # Strip node_sig from payload before verification — it was not
401 # present when the signature was computed during generation.
402 verify_payload = {k: v for k, v in payload.items()
403 if k != 'node_sig'}
404 if not verify_json_signature(
405 issuer_public_key_hex, verify_payload, node_sig):
406 logger.warning("Hive token node_sig verification failed")
407 return None
408 except Exception as e:
409 logger.warning(f"Hive token Ed25519 verification error: {e}")
410 return None
412 return payload
414 def decode_local_token(self, token: str,
415 expected_type: str = 'access') -> Optional[dict]:
416 """Decode a local-scoped token. Backward compatible.
418 Tokens without a 'scope' claim are treated as local (pre-upgrade tokens).
419 Rejects hive-scoped tokens — use verify_hive_token() for those.
420 """
421 payload = self.decode_token(token, expected_type=expected_type)
422 if payload is None:
423 return None
425 scope = payload.get('scope')
426 if scope is not None and scope != 'local':
427 logger.warning(
428 f"decode_local_token rejected token with scope={scope}")
429 return None
431 return payload