Coverage for integrations / agent_engine / continual_learner_gate.py: 96.1%
179 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Continual Learner Gate — CCT issuance, validation, and learning access control.
4The continual learner is the incentive. People who spend compute to help train
5the model in a distributed, crowdsourced way earn access to the learned
6intelligence. No contribution = no learning.
8CCT (Compute Contribution Token): Ed25519-signed token proving a node has
9contributed compute and is integrity-verified. Short-lived (24h), offline-
10verifiable (zero DB calls for validation), node-bound (useless on other nodes).
12Service Pattern: static methods, db: Session, db.flush() not db.commit().
13"""
14import base64
15import json
16import logging
17import os
18import time
19import uuid
20from datetime import datetime, timedelta
21from typing import Dict, List, Optional
23logger = logging.getLogger('hevolve_social')
25# ─── Learning Tier Configuration ───
27LEARNING_TIER_THRESHOLDS = {
28 'none': 0, # No learning (inference only)
29 'basic': 50, # Temporal coherence (predict + validate)
30 'full': 200, # + Manifold credit + meta-learning
31 'host': 500, # + RealityGroundedLearner + HiveMind + skill distribution
32}
34# Minimum capability_tier required for each learning tier
35MINIMUM_CAPABILITY_TIER = {
36 'none': 'observer',
37 'basic': 'standard',
38 'full': 'full',
39 'host': 'compute_host',
40}
42CAPABILITY_TIER_ORDER = ['embedded', 'observer', 'lite', 'standard', 'full', 'compute_host']
44LEARNING_ACCESS_MATRIX = {
45 'none': [],
46 'basic': ['temporal_coherence'],
47 'full': ['temporal_coherence', 'manifold_credit', 'meta_learning',
48 'embedding_sync'],
49 'host': ['temporal_coherence', 'manifold_credit', 'meta_learning',
50 'reality_grounded', 'hivemind_query', 'skill_distribution',
51 'embedding_sync'],
52}
54CCT_VALIDITY_HOURS = 24
55CCT_RENEWAL_GRACE_HOURS = 2
56CCT_CLOCK_SKEW_SECONDS = 300 # 5 min tolerance
58# Trusted issuers cache (populated during gossip/announce)
59_trusted_issuers: Dict[str, dict] = {} # pub_key_hex → {node_id, tier, ...}
62def register_trusted_issuer(public_key_hex: str, node_id: str,
63 tier: str = 'central'):
64 """Register a node as a trusted CCT issuer (called during gossip merge)."""
65 _trusted_issuers[public_key_hex] = {
66 'node_id': node_id, 'tier': tier,
67 'registered_at': time.time(),
68 }
71def get_trusted_issuers() -> Dict[str, dict]:
72 """Return current trusted issuers (for testing/debug)."""
73 return dict(_trusted_issuers)
76class ContinualLearnerGateService:
77 """Manages Compute Contribution Tokens for learning access control."""
79 # ─── Tier Computation ───
81 @staticmethod
82 def compute_learning_tier(db, node_id: str) -> Dict:
83 """Compute learning access tier from contribution_score + integrity + capability.
85 Returns: {'tier': str, 'capabilities': [...], 'contribution_score': float,
86 'integrity_status': str, 'capability_tier': str, 'eligible': bool}
87 """
88 try:
89 from integrations.social.models import PeerNode
90 except ImportError:
91 return {'tier': 'none', 'capabilities': [], 'eligible': False,
92 'reason': 'models_unavailable'}
94 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
95 if not peer:
96 return {'tier': 'none', 'capabilities': [], 'eligible': False,
97 'reason': 'node_not_found'}
99 score = peer.contribution_score or 0.0
100 integrity = peer.integrity_status or 'unverified'
101 cap_tier = peer.capability_tier or 'observer'
103 # Must be integrity-verified
104 if integrity != 'verified':
105 return {
106 'tier': 'none', 'capabilities': [], 'eligible': False,
107 'contribution_score': score, 'integrity_status': integrity,
108 'capability_tier': cap_tier, 'reason': f'integrity_{integrity}',
109 }
111 # Must not be banned
112 if peer.ban_until and peer.ban_until > datetime.utcnow():
113 return {
114 'tier': 'none', 'capabilities': [], 'eligible': False,
115 'contribution_score': score, 'integrity_status': integrity,
116 'capability_tier': cap_tier, 'reason': 'banned',
117 }
119 # Compute tier from score + capability
120 cap_idx = (CAPABILITY_TIER_ORDER.index(cap_tier)
121 if cap_tier in CAPABILITY_TIER_ORDER else 0)
123 tier = 'none'
124 for t in ['host', 'full', 'basic']:
125 threshold = LEARNING_TIER_THRESHOLDS[t]
126 min_cap = MINIMUM_CAPABILITY_TIER[t]
127 min_cap_idx = CAPABILITY_TIER_ORDER.index(min_cap)
128 if score >= threshold and cap_idx >= min_cap_idx:
129 tier = t
130 break
132 capabilities = LEARNING_ACCESS_MATRIX.get(tier, [])
133 return {
134 'tier': tier,
135 'capabilities': capabilities,
136 'eligible': tier != 'none',
137 'contribution_score': score,
138 'integrity_status': integrity,
139 'capability_tier': cap_tier,
140 }
142 # ─── CCT Issuance ───
144 @staticmethod
145 def issue_cct(db, node_id: str) -> Optional[Dict]:
146 """Issue a Compute Contribution Token for an eligible node.
148 Returns: {'cct': '<payload_b64>.<sig_hex>', 'tier': str, 'expires_at': str,
149 'capabilities': [...]} or None if ineligible.
150 """
151 tier_info = ContinualLearnerGateService.compute_learning_tier(
152 db, node_id)
153 if not tier_info.get('eligible'):
154 logger.info(f"CCT denied for {node_id}: {tier_info.get('reason')}")
155 return None
157 try:
158 from security.node_integrity import (
159 sign_json_payload, get_public_key_hex, get_node_identity)
160 except ImportError:
161 logger.warning("CCT issuance failed: node_integrity unavailable")
162 return None
164 now = int(time.time())
165 nonce = uuid.uuid4().hex[:12]
166 issuer_pub = get_public_key_hex()
167 identity = get_node_identity()
169 payload = {
170 'sub': node_id,
171 'pub': tier_info.get('capability_tier', ''),
172 'tier': tier_info['tier'],
173 'cs': round(tier_info.get('contribution_score', 0), 2),
174 'ist': tier_info.get('integrity_status', 'verified'),
175 'iat': now,
176 'exp': now + (CCT_VALIDITY_HOURS * 3600),
177 'iss': issuer_pub,
178 'nonce': nonce,
179 }
181 signature_hex = sign_json_payload(payload)
182 payload_b64 = base64.urlsafe_b64encode(
183 json.dumps(payload, sort_keys=True, separators=(',', ':')).encode()
184 ).decode()
185 cct_string = f"{payload_b64}.{signature_hex}"
187 # Record attestation
188 try:
189 from integrations.social.models import NodeAttestation
190 attestation = NodeAttestation(
191 attester_node_id=identity.get('node_id', 'self'),
192 subject_node_id=node_id,
193 attestation_type='cct_issued',
194 payload_json={
195 'tier': tier_info['tier'],
196 'contribution_score': tier_info.get('contribution_score'),
197 'validity_hours': CCT_VALIDITY_HOURS,
198 'nonce': nonce,
199 },
200 signature=signature_hex[:256],
201 attester_public_key=issuer_pub,
202 is_valid=True,
203 expires_at=datetime.utcnow() + timedelta(
204 hours=CCT_VALIDITY_HOURS),
205 )
206 db.add(attestation)
207 db.flush()
208 except Exception as e:
209 logger.debug(f"CCT attestation record failed: {e}")
211 # Award spark for receiving CCT (learning contribution)
212 try:
213 from integrations.social.models import PeerNode
214 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
215 if peer and peer.node_operator_id:
216 from integrations.social.resonance_engine import ResonanceService
217 ResonanceService.award_action(
218 db, peer.node_operator_id, 'learning_contribution',
219 source_id=nonce)
220 except Exception as e:
221 logger.debug(f"CCT spark award failed: {e}")
223 logger.info(f"CCT issued for {node_id}: tier={tier_info['tier']}, "
224 f"score={tier_info.get('contribution_score', 0)}")
226 return {
227 'cct': cct_string,
228 'tier': tier_info['tier'],
229 'capabilities': tier_info['capabilities'],
230 'expires_at': datetime.utcfromtimestamp(
231 payload['exp']).isoformat() + 'Z',
232 'contribution_score': tier_info.get('contribution_score', 0),
233 }
235 # ─── CCT Validation (Zero DB Calls) ───
237 @staticmethod
238 def validate_cct(cct_string: str,
239 expected_node_id: str = None) -> Dict:
240 """Validate a CCT locally. Pure cryptographic verification — no DB calls.
242 Returns: {'valid': bool, 'tier': str, 'capabilities': [...],
243 'expires_in': seconds, 'reason': str}
244 """
245 try:
246 parts = cct_string.split('.')
247 if len(parts) != 2:
248 return {'valid': False, 'tier': 'none', 'capabilities': [],
249 'reason': 'malformed_token'}
251 payload_b64, signature_hex = parts
252 payload_json = base64.urlsafe_b64decode(payload_b64).decode()
253 payload = json.loads(payload_json)
254 except Exception:
255 return {'valid': False, 'tier': 'none', 'capabilities': [],
256 'reason': 'decode_error'}
258 # Verify issuer is trusted
259 issuer_pub = payload.get('iss', '')
260 if issuer_pub not in _trusted_issuers:
261 # Self-issued CCTs are valid if this is the issuing node
262 try:
263 from security.node_integrity import get_public_key_hex
264 local_pub = get_public_key_hex()
265 if issuer_pub != local_pub:
266 return {'valid': False, 'tier': 'none', 'capabilities': [],
267 'reason': 'untrusted_issuer'}
268 except ImportError:
269 return {'valid': False, 'tier': 'none', 'capabilities': [],
270 'reason': 'untrusted_issuer'}
272 # Verify signature
273 try:
274 from security.node_integrity import verify_json_signature
275 if not verify_json_signature(issuer_pub, payload, signature_hex):
276 return {'valid': False, 'tier': 'none', 'capabilities': [],
277 'reason': 'invalid_signature'}
278 except ImportError:
279 return {'valid': False, 'tier': 'none', 'capabilities': [],
280 'reason': 'crypto_unavailable'}
282 # Check expiry (with clock skew tolerance)
283 now = int(time.time())
284 exp = payload.get('exp', 0)
285 if now > exp + CCT_CLOCK_SKEW_SECONDS:
286 return {'valid': False, 'tier': 'none', 'capabilities': [],
287 'reason': 'expired',
288 'expired_seconds_ago': now - exp}
290 # Check node binding
291 if expected_node_id and payload.get('sub') != expected_node_id:
292 return {'valid': False, 'tier': 'none', 'capabilities': [],
293 'reason': 'node_mismatch'}
295 tier = payload.get('tier', 'none')
296 capabilities = LEARNING_ACCESS_MATRIX.get(tier, [])
297 return {
298 'valid': True,
299 'tier': tier,
300 'capabilities': capabilities,
301 'expires_in': max(0, exp - now),
302 'node_id': payload.get('sub'),
303 'contribution_score': payload.get('cs', 0),
304 'issued_at': payload.get('iat'),
305 'nonce': payload.get('nonce'),
306 }
308 # ─── Convenience Check ───
310 @staticmethod
311 def check_cct_capability(cct_string: str, capability: str,
312 expected_node_id: str = None) -> bool:
313 """Quick check: does this CCT grant a specific capability?"""
314 result = ContinualLearnerGateService.validate_cct(
315 cct_string, expected_node_id)
316 return result.get('valid', False) and capability in result.get(
317 'capabilities', [])
319 # ─── CCT Renewal ───
321 @staticmethod
322 def renew_cct(db, node_id: str, old_cct: str = None) -> Optional[Dict]:
323 """Renew an existing CCT. Re-validates eligibility.
325 Returns new CCT dict or None if no longer eligible.
326 """
327 if old_cct:
328 old_result = ContinualLearnerGateService.validate_cct(
329 old_cct, node_id)
330 if not old_result.get('valid') and old_result.get(
331 'reason') != 'expired':
332 logger.info(f"CCT renewal denied for {node_id}: "
333 f"old CCT invalid ({old_result.get('reason')})")
334 return None
336 return ContinualLearnerGateService.issue_cct(db, node_id)
338 # ─── CCT Revocation ───
340 @staticmethod
341 def revoke_cct(db, node_id: str, reason: str = 'manual') -> Dict:
342 """Revoke a node's CCT by invalidating its attestation."""
343 try:
344 from integrations.social.models import NodeAttestation
345 attestations = db.query(NodeAttestation).filter_by(
346 subject_node_id=node_id,
347 attestation_type='cct_issued',
348 is_valid=True,
349 ).all()
351 count = 0
352 for att in attestations:
353 att.is_valid = False
354 count += 1
355 db.flush()
357 logger.info(f"CCT revoked for {node_id}: {reason} "
358 f"({count} attestations invalidated)")
359 return {'success': True, 'revoked_count': count, 'reason': reason}
360 except Exception as e:
361 logger.warning(f"CCT revocation failed for {node_id}: {e}")
362 return {'success': False, 'error': str(e)}
364 # ─── Stats ───
366 @staticmethod
367 def get_learning_tier_stats(db) -> Dict:
368 """Aggregate stats: nodes per tier, total contributions."""
369 try:
370 from integrations.social.models import PeerNode
371 except ImportError:
372 return {'tiers': {}, 'total_nodes': 0}
374 peers = db.query(PeerNode).filter(
375 PeerNode.status.in_(['active', 'stale'])
376 ).all()
378 tier_counts = {'none': 0, 'basic': 0, 'full': 0, 'host': 0}
379 total_score = 0.0
381 for peer in peers:
382 info = ContinualLearnerGateService.compute_learning_tier(
383 db, peer.node_id)
384 tier = info.get('tier', 'none')
385 tier_counts[tier] = tier_counts.get(tier, 0) + 1
386 total_score += peer.contribution_score or 0.0
388 return {
389 'tiers': tier_counts,
390 'total_nodes': len(peers),
391 'total_contribution_score': round(total_score, 2),
392 'eligible_nodes': sum(v for k, v in tier_counts.items()
393 if k != 'none'),
394 }
396 # ─── Compute Contribution Verification ───
398 @staticmethod
399 def verify_compute_contribution(db, node_id: str,
400 benchmark_result: Dict) -> Dict:
401 """Verify a compute contribution microbenchmark and create attestation.
403 benchmark_result: {'benchmark_type': str, 'score': float,
404 'duration_ms': float, 'hardware_info': dict}
405 """
406 try:
407 from integrations.social.models import PeerNode, NodeAttestation
408 from security.node_integrity import (
409 get_public_key_hex, sign_json_payload, get_node_identity)
410 except ImportError:
411 return {'verified': False, 'reason': 'imports_unavailable'}
413 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
414 if not peer:
415 return {'verified': False, 'reason': 'node_not_found'}
417 # Basic validation of benchmark result
418 score = benchmark_result.get('score', 0)
419 duration = benchmark_result.get('duration_ms', 0)
420 if score <= 0 or duration <= 0:
421 return {'verified': False, 'reason': 'invalid_benchmark'}
423 # Create attestation
424 identity = get_node_identity()
425 evidence = {
426 'benchmark_type': benchmark_result.get('benchmark_type', 'unknown'),
427 'score': score,
428 'duration_ms': duration,
429 'hardware_info': benchmark_result.get('hardware_info', {}),
430 'verified_at': datetime.utcnow().isoformat(),
431 }
432 sig = sign_json_payload(evidence)
434 attestation = NodeAttestation(
435 attester_node_id=identity.get('node_id', 'self'),
436 subject_node_id=node_id,
437 attestation_type='compute_contribution',
438 payload_json=evidence,
439 signature=sig[:256],
440 attester_public_key=get_public_key_hex(),
441 is_valid=True,
442 expires_at=datetime.utcnow() + timedelta(days=7),
443 )
444 db.add(attestation)
445 db.flush()
447 # Award spark for compute contribution
448 try:
449 if peer.node_operator_id:
450 from integrations.social.resonance_engine import ResonanceService
451 ResonanceService.award_action(
452 db, peer.node_operator_id, 'learning_credit_assigned',
453 source_id=attestation.id)
454 except Exception:
455 pass
457 return {
458 'verified': True,
459 'attestation_id': attestation.id,
460 'score': score,
461 }
463 # ─── CCT File Management ───
465 @staticmethod
466 def save_cct_to_file(cct_string: str,
467 path: str = 'agent_data/cct.json'):
468 """Persist CCT to local file for offline validation."""
469 try:
470 os.makedirs(os.path.dirname(path), exist_ok=True)
471 with open(path, 'w') as f:
472 json.dump({
473 'cct': cct_string,
474 'saved_at': datetime.utcnow().isoformat(),
475 }, f)
476 except Exception as e:
477 logger.debug(f"Failed to save CCT: {e}")
479 @staticmethod
480 def load_cct_from_file(path: str = 'agent_data/cct.json') -> Optional[str]:
481 """Load CCT from local file."""
482 try:
483 if os.path.isfile(path):
484 with open(path, 'r') as f:
485 data = json.load(f)
486 return data.get('cct')
487 except Exception:
488 pass
489 return None