Coverage for integrations / agent_engine / continual_learner_gate.py: 96.1%

179 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Continual Learner Gate — CCT issuance, validation, and learning access control. 

3 

4The continual learner is the incentive. People who spend compute to help train 

5the model in a distributed, crowdsourced way earn access to the learned 

6intelligence. No contribution = no learning. 

7 

8CCT (Compute Contribution Token): Ed25519-signed token proving a node has 

9contributed compute and is integrity-verified. Short-lived (24h), offline- 

10verifiable (zero DB calls for validation), node-bound (useless on other nodes). 

11 

12Service Pattern: static methods, db: Session, db.flush() not db.commit(). 

13""" 

14import base64 

15import json 

16import logging 

17import os 

18import time 

19import uuid 

20from datetime import datetime, timedelta 

21from typing import Dict, List, Optional 

22 

23logger = logging.getLogger('hevolve_social') 

24 

25# ─── Learning Tier Configuration ─── 

26 

27LEARNING_TIER_THRESHOLDS = { 

28 'none': 0, # No learning (inference only) 

29 'basic': 50, # Temporal coherence (predict + validate) 

30 'full': 200, # + Manifold credit + meta-learning 

31 'host': 500, # + RealityGroundedLearner + HiveMind + skill distribution 

32} 

33 

34# Minimum capability_tier required for each learning tier 

35MINIMUM_CAPABILITY_TIER = { 

36 'none': 'observer', 

37 'basic': 'standard', 

38 'full': 'full', 

39 'host': 'compute_host', 

40} 

41 

42CAPABILITY_TIER_ORDER = ['embedded', 'observer', 'lite', 'standard', 'full', 'compute_host'] 

43 

44LEARNING_ACCESS_MATRIX = { 

45 'none': [], 

46 'basic': ['temporal_coherence'], 

47 'full': ['temporal_coherence', 'manifold_credit', 'meta_learning', 

48 'embedding_sync'], 

49 'host': ['temporal_coherence', 'manifold_credit', 'meta_learning', 

50 'reality_grounded', 'hivemind_query', 'skill_distribution', 

51 'embedding_sync'], 

52} 

53 

54CCT_VALIDITY_HOURS = 24 

55CCT_RENEWAL_GRACE_HOURS = 2 

56CCT_CLOCK_SKEW_SECONDS = 300 # 5 min tolerance 

57 

58# Trusted issuers cache (populated during gossip/announce) 

59_trusted_issuers: Dict[str, dict] = {} # pub_key_hex → {node_id, tier, ...} 

60 

61 

62def register_trusted_issuer(public_key_hex: str, node_id: str, 

63 tier: str = 'central'): 

64 """Register a node as a trusted CCT issuer (called during gossip merge).""" 

65 _trusted_issuers[public_key_hex] = { 

66 'node_id': node_id, 'tier': tier, 

67 'registered_at': time.time(), 

68 } 

69 

70 

71def get_trusted_issuers() -> Dict[str, dict]: 

72 """Return current trusted issuers (for testing/debug).""" 

73 return dict(_trusted_issuers) 

74 

75 

76class ContinualLearnerGateService: 

77 """Manages Compute Contribution Tokens for learning access control.""" 

78 

79 # ─── Tier Computation ─── 

80 

81 @staticmethod 

82 def compute_learning_tier(db, node_id: str) -> Dict: 

83 """Compute learning access tier from contribution_score + integrity + capability. 

84 

85 Returns: {'tier': str, 'capabilities': [...], 'contribution_score': float, 

86 'integrity_status': str, 'capability_tier': str, 'eligible': bool} 

87 """ 

88 try: 

89 from integrations.social.models import PeerNode 

90 except ImportError: 

91 return {'tier': 'none', 'capabilities': [], 'eligible': False, 

92 'reason': 'models_unavailable'} 

93 

94 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

95 if not peer: 

96 return {'tier': 'none', 'capabilities': [], 'eligible': False, 

97 'reason': 'node_not_found'} 

98 

99 score = peer.contribution_score or 0.0 

100 integrity = peer.integrity_status or 'unverified' 

101 cap_tier = peer.capability_tier or 'observer' 

102 

103 # Must be integrity-verified 

104 if integrity != 'verified': 

105 return { 

106 'tier': 'none', 'capabilities': [], 'eligible': False, 

107 'contribution_score': score, 'integrity_status': integrity, 

108 'capability_tier': cap_tier, 'reason': f'integrity_{integrity}', 

109 } 

110 

111 # Must not be banned 

112 if peer.ban_until and peer.ban_until > datetime.utcnow(): 

113 return { 

114 'tier': 'none', 'capabilities': [], 'eligible': False, 

115 'contribution_score': score, 'integrity_status': integrity, 

116 'capability_tier': cap_tier, 'reason': 'banned', 

117 } 

118 

119 # Compute tier from score + capability 

120 cap_idx = (CAPABILITY_TIER_ORDER.index(cap_tier) 

121 if cap_tier in CAPABILITY_TIER_ORDER else 0) 

122 

123 tier = 'none' 

124 for t in ['host', 'full', 'basic']: 

125 threshold = LEARNING_TIER_THRESHOLDS[t] 

126 min_cap = MINIMUM_CAPABILITY_TIER[t] 

127 min_cap_idx = CAPABILITY_TIER_ORDER.index(min_cap) 

128 if score >= threshold and cap_idx >= min_cap_idx: 

129 tier = t 

130 break 

131 

132 capabilities = LEARNING_ACCESS_MATRIX.get(tier, []) 

133 return { 

134 'tier': tier, 

135 'capabilities': capabilities, 

136 'eligible': tier != 'none', 

137 'contribution_score': score, 

138 'integrity_status': integrity, 

139 'capability_tier': cap_tier, 

140 } 

141 

142 # ─── CCT Issuance ─── 

143 

144 @staticmethod 

145 def issue_cct(db, node_id: str) -> Optional[Dict]: 

146 """Issue a Compute Contribution Token for an eligible node. 

147 

148 Returns: {'cct': '<payload_b64>.<sig_hex>', 'tier': str, 'expires_at': str, 

149 'capabilities': [...]} or None if ineligible. 

150 """ 

151 tier_info = ContinualLearnerGateService.compute_learning_tier( 

152 db, node_id) 

153 if not tier_info.get('eligible'): 

154 logger.info(f"CCT denied for {node_id}: {tier_info.get('reason')}") 

155 return None 

156 

157 try: 

158 from security.node_integrity import ( 

159 sign_json_payload, get_public_key_hex, get_node_identity) 

160 except ImportError: 

161 logger.warning("CCT issuance failed: node_integrity unavailable") 

162 return None 

163 

164 now = int(time.time()) 

165 nonce = uuid.uuid4().hex[:12] 

166 issuer_pub = get_public_key_hex() 

167 identity = get_node_identity() 

168 

169 payload = { 

170 'sub': node_id, 

171 'pub': tier_info.get('capability_tier', ''), 

172 'tier': tier_info['tier'], 

173 'cs': round(tier_info.get('contribution_score', 0), 2), 

174 'ist': tier_info.get('integrity_status', 'verified'), 

175 'iat': now, 

176 'exp': now + (CCT_VALIDITY_HOURS * 3600), 

177 'iss': issuer_pub, 

178 'nonce': nonce, 

179 } 

180 

181 signature_hex = sign_json_payload(payload) 

182 payload_b64 = base64.urlsafe_b64encode( 

183 json.dumps(payload, sort_keys=True, separators=(',', ':')).encode() 

184 ).decode() 

185 cct_string = f"{payload_b64}.{signature_hex}" 

186 

187 # Record attestation 

188 try: 

189 from integrations.social.models import NodeAttestation 

190 attestation = NodeAttestation( 

191 attester_node_id=identity.get('node_id', 'self'), 

192 subject_node_id=node_id, 

193 attestation_type='cct_issued', 

194 payload_json={ 

195 'tier': tier_info['tier'], 

196 'contribution_score': tier_info.get('contribution_score'), 

197 'validity_hours': CCT_VALIDITY_HOURS, 

198 'nonce': nonce, 

199 }, 

200 signature=signature_hex[:256], 

201 attester_public_key=issuer_pub, 

202 is_valid=True, 

203 expires_at=datetime.utcnow() + timedelta( 

204 hours=CCT_VALIDITY_HOURS), 

205 ) 

206 db.add(attestation) 

207 db.flush() 

208 except Exception as e: 

209 logger.debug(f"CCT attestation record failed: {e}") 

210 

211 # Award spark for receiving CCT (learning contribution) 

212 try: 

213 from integrations.social.models import PeerNode 

214 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

215 if peer and peer.node_operator_id: 

216 from integrations.social.resonance_engine import ResonanceService 

217 ResonanceService.award_action( 

218 db, peer.node_operator_id, 'learning_contribution', 

219 source_id=nonce) 

220 except Exception as e: 

221 logger.debug(f"CCT spark award failed: {e}") 

222 

223 logger.info(f"CCT issued for {node_id}: tier={tier_info['tier']}, " 

224 f"score={tier_info.get('contribution_score', 0)}") 

225 

226 return { 

227 'cct': cct_string, 

228 'tier': tier_info['tier'], 

229 'capabilities': tier_info['capabilities'], 

230 'expires_at': datetime.utcfromtimestamp( 

231 payload['exp']).isoformat() + 'Z', 

232 'contribution_score': tier_info.get('contribution_score', 0), 

233 } 

234 

235 # ─── CCT Validation (Zero DB Calls) ─── 

236 

237 @staticmethod 

238 def validate_cct(cct_string: str, 

239 expected_node_id: str = None) -> Dict: 

240 """Validate a CCT locally. Pure cryptographic verification — no DB calls. 

241 

242 Returns: {'valid': bool, 'tier': str, 'capabilities': [...], 

243 'expires_in': seconds, 'reason': str} 

244 """ 

245 try: 

246 parts = cct_string.split('.') 

247 if len(parts) != 2: 

248 return {'valid': False, 'tier': 'none', 'capabilities': [], 

249 'reason': 'malformed_token'} 

250 

251 payload_b64, signature_hex = parts 

252 payload_json = base64.urlsafe_b64decode(payload_b64).decode() 

253 payload = json.loads(payload_json) 

254 except Exception: 

255 return {'valid': False, 'tier': 'none', 'capabilities': [], 

256 'reason': 'decode_error'} 

257 

258 # Verify issuer is trusted 

259 issuer_pub = payload.get('iss', '') 

260 if issuer_pub not in _trusted_issuers: 

261 # Self-issued CCTs are valid if this is the issuing node 

262 try: 

263 from security.node_integrity import get_public_key_hex 

264 local_pub = get_public_key_hex() 

265 if issuer_pub != local_pub: 

266 return {'valid': False, 'tier': 'none', 'capabilities': [], 

267 'reason': 'untrusted_issuer'} 

268 except ImportError: 

269 return {'valid': False, 'tier': 'none', 'capabilities': [], 

270 'reason': 'untrusted_issuer'} 

271 

272 # Verify signature 

273 try: 

274 from security.node_integrity import verify_json_signature 

275 if not verify_json_signature(issuer_pub, payload, signature_hex): 

276 return {'valid': False, 'tier': 'none', 'capabilities': [], 

277 'reason': 'invalid_signature'} 

278 except ImportError: 

279 return {'valid': False, 'tier': 'none', 'capabilities': [], 

280 'reason': 'crypto_unavailable'} 

281 

282 # Check expiry (with clock skew tolerance) 

283 now = int(time.time()) 

284 exp = payload.get('exp', 0) 

285 if now > exp + CCT_CLOCK_SKEW_SECONDS: 

286 return {'valid': False, 'tier': 'none', 'capabilities': [], 

287 'reason': 'expired', 

288 'expired_seconds_ago': now - exp} 

289 

290 # Check node binding 

291 if expected_node_id and payload.get('sub') != expected_node_id: 

292 return {'valid': False, 'tier': 'none', 'capabilities': [], 

293 'reason': 'node_mismatch'} 

294 

295 tier = payload.get('tier', 'none') 

296 capabilities = LEARNING_ACCESS_MATRIX.get(tier, []) 

297 return { 

298 'valid': True, 

299 'tier': tier, 

300 'capabilities': capabilities, 

301 'expires_in': max(0, exp - now), 

302 'node_id': payload.get('sub'), 

303 'contribution_score': payload.get('cs', 0), 

304 'issued_at': payload.get('iat'), 

305 'nonce': payload.get('nonce'), 

306 } 

307 

308 # ─── Convenience Check ─── 

309 

310 @staticmethod 

311 def check_cct_capability(cct_string: str, capability: str, 

312 expected_node_id: str = None) -> bool: 

313 """Quick check: does this CCT grant a specific capability?""" 

314 result = ContinualLearnerGateService.validate_cct( 

315 cct_string, expected_node_id) 

316 return result.get('valid', False) and capability in result.get( 

317 'capabilities', []) 

318 

319 # ─── CCT Renewal ─── 

320 

321 @staticmethod 

322 def renew_cct(db, node_id: str, old_cct: str = None) -> Optional[Dict]: 

323 """Renew an existing CCT. Re-validates eligibility. 

324 

325 Returns new CCT dict or None if no longer eligible. 

326 """ 

327 if old_cct: 

328 old_result = ContinualLearnerGateService.validate_cct( 

329 old_cct, node_id) 

330 if not old_result.get('valid') and old_result.get( 

331 'reason') != 'expired': 

332 logger.info(f"CCT renewal denied for {node_id}: " 

333 f"old CCT invalid ({old_result.get('reason')})") 

334 return None 

335 

336 return ContinualLearnerGateService.issue_cct(db, node_id) 

337 

338 # ─── CCT Revocation ─── 

339 

340 @staticmethod 

341 def revoke_cct(db, node_id: str, reason: str = 'manual') -> Dict: 

342 """Revoke a node's CCT by invalidating its attestation.""" 

343 try: 

344 from integrations.social.models import NodeAttestation 

345 attestations = db.query(NodeAttestation).filter_by( 

346 subject_node_id=node_id, 

347 attestation_type='cct_issued', 

348 is_valid=True, 

349 ).all() 

350 

351 count = 0 

352 for att in attestations: 

353 att.is_valid = False 

354 count += 1 

355 db.flush() 

356 

357 logger.info(f"CCT revoked for {node_id}: {reason} " 

358 f"({count} attestations invalidated)") 

359 return {'success': True, 'revoked_count': count, 'reason': reason} 

360 except Exception as e: 

361 logger.warning(f"CCT revocation failed for {node_id}: {e}") 

362 return {'success': False, 'error': str(e)} 

363 

364 # ─── Stats ─── 

365 

366 @staticmethod 

367 def get_learning_tier_stats(db) -> Dict: 

368 """Aggregate stats: nodes per tier, total contributions.""" 

369 try: 

370 from integrations.social.models import PeerNode 

371 except ImportError: 

372 return {'tiers': {}, 'total_nodes': 0} 

373 

374 peers = db.query(PeerNode).filter( 

375 PeerNode.status.in_(['active', 'stale']) 

376 ).all() 

377 

378 tier_counts = {'none': 0, 'basic': 0, 'full': 0, 'host': 0} 

379 total_score = 0.0 

380 

381 for peer in peers: 

382 info = ContinualLearnerGateService.compute_learning_tier( 

383 db, peer.node_id) 

384 tier = info.get('tier', 'none') 

385 tier_counts[tier] = tier_counts.get(tier, 0) + 1 

386 total_score += peer.contribution_score or 0.0 

387 

388 return { 

389 'tiers': tier_counts, 

390 'total_nodes': len(peers), 

391 'total_contribution_score': round(total_score, 2), 

392 'eligible_nodes': sum(v for k, v in tier_counts.items() 

393 if k != 'none'), 

394 } 

395 

396 # ─── Compute Contribution Verification ─── 

397 

398 @staticmethod 

399 def verify_compute_contribution(db, node_id: str, 

400 benchmark_result: Dict) -> Dict: 

401 """Verify a compute contribution microbenchmark and create attestation. 

402 

403 benchmark_result: {'benchmark_type': str, 'score': float, 

404 'duration_ms': float, 'hardware_info': dict} 

405 """ 

406 try: 

407 from integrations.social.models import PeerNode, NodeAttestation 

408 from security.node_integrity import ( 

409 get_public_key_hex, sign_json_payload, get_node_identity) 

410 except ImportError: 

411 return {'verified': False, 'reason': 'imports_unavailable'} 

412 

413 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

414 if not peer: 

415 return {'verified': False, 'reason': 'node_not_found'} 

416 

417 # Basic validation of benchmark result 

418 score = benchmark_result.get('score', 0) 

419 duration = benchmark_result.get('duration_ms', 0) 

420 if score <= 0 or duration <= 0: 

421 return {'verified': False, 'reason': 'invalid_benchmark'} 

422 

423 # Create attestation 

424 identity = get_node_identity() 

425 evidence = { 

426 'benchmark_type': benchmark_result.get('benchmark_type', 'unknown'), 

427 'score': score, 

428 'duration_ms': duration, 

429 'hardware_info': benchmark_result.get('hardware_info', {}), 

430 'verified_at': datetime.utcnow().isoformat(), 

431 } 

432 sig = sign_json_payload(evidence) 

433 

434 attestation = NodeAttestation( 

435 attester_node_id=identity.get('node_id', 'self'), 

436 subject_node_id=node_id, 

437 attestation_type='compute_contribution', 

438 payload_json=evidence, 

439 signature=sig[:256], 

440 attester_public_key=get_public_key_hex(), 

441 is_valid=True, 

442 expires_at=datetime.utcnow() + timedelta(days=7), 

443 ) 

444 db.add(attestation) 

445 db.flush() 

446 

447 # Award spark for compute contribution 

448 try: 

449 if peer.node_operator_id: 

450 from integrations.social.resonance_engine import ResonanceService 

451 ResonanceService.award_action( 

452 db, peer.node_operator_id, 'learning_credit_assigned', 

453 source_id=attestation.id) 

454 except Exception: 

455 pass 

456 

457 return { 

458 'verified': True, 

459 'attestation_id': attestation.id, 

460 'score': score, 

461 } 

462 

463 # ─── CCT File Management ─── 

464 

465 @staticmethod 

466 def save_cct_to_file(cct_string: str, 

467 path: str = 'agent_data/cct.json'): 

468 """Persist CCT to local file for offline validation.""" 

469 try: 

470 os.makedirs(os.path.dirname(path), exist_ok=True) 

471 with open(path, 'w') as f: 

472 json.dump({ 

473 'cct': cct_string, 

474 'saved_at': datetime.utcnow().isoformat(), 

475 }, f) 

476 except Exception as e: 

477 logger.debug(f"Failed to save CCT: {e}") 

478 

479 @staticmethod 

480 def load_cct_from_file(path: str = 'agent_data/cct.json') -> Optional[str]: 

481 """Load CCT from local file.""" 

482 try: 

483 if os.path.isfile(path): 

484 with open(path, 'r') as f: 

485 data = json.load(f) 

486 return data.get('cct') 

487 except Exception: 

488 pass 

489 return None