Coverage for integrations / agent_engine / api_learning.py: 41.7%
156 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Continual Learning API Blueprint — CCT management and learning tier endpoints.
4POST /api/learning/cct/request — Node requests initial CCT
5POST /api/learning/cct/renew — Renew expiring CCT
6GET /api/learning/cct/status — Check current CCT status
7POST /api/learning/cct/verify — Verify any CCT (public)
8GET /api/learning/tiers — Tier distribution stats
9GET /api/learning/contributions — Contribution leaderboard
10POST /api/learning/benchmark — Submit compute benchmark
11"""
12import logging
14from flask import Blueprint, jsonify, request
16logger = logging.getLogger('hevolve_social')
18learning_bp = Blueprint('learning', __name__)
21def _verify_node_signature(body: dict) -> bool:
22 """Verify Ed25519 signature on request body (same as gossip announce)."""
23 try:
24 node_id = body.get('node_id', '')
25 signature = body.get('signature', '')
26 public_key = body.get('public_key', '')
27 if not all([node_id, signature, public_key]):
28 return False
29 from security.node_integrity import verify_json_signature
30 payload = {k: v for k, v in body.items() if k != 'signature'}
31 return verify_json_signature(public_key, payload, signature)
32 except Exception:
33 return False
36@learning_bp.route('/api/learning/cct/request', methods=['POST'])
37def request_cct():
38 """Node requests a new Compute Contribution Token."""
39 from integrations.social.models import get_db
40 from .continual_learner_gate import ContinualLearnerGateService
42 body = request.get_json(silent=True) or {}
43 node_id = body.get('node_id')
44 if not node_id:
45 return jsonify({'success': False, 'error': 'node_id required'}), 400
47 if not _verify_node_signature(body):
48 return jsonify({'success': False, 'error': 'invalid_signature'}), 403
50 db = get_db()
51 try:
52 result = ContinualLearnerGateService.issue_cct(db, node_id)
53 if result:
54 db.commit()
55 return jsonify({'success': True, 'data': result}), 200
56 else:
57 tier_info = ContinualLearnerGateService.compute_learning_tier(
58 db, node_id)
59 return jsonify({
60 'success': False,
61 'error': 'Not eligible for learning access',
62 'tier_info': tier_info,
63 }), 403
64 except Exception as e:
65 db.rollback()
66 logger.error(f"CCT request error: {e}")
67 return jsonify({'success': False, 'error': str(e)}), 500
68 finally:
69 db.close()
72@learning_bp.route('/api/learning/cct/renew', methods=['POST'])
73def renew_cct():
74 """Renew an existing CCT. Re-validates eligibility."""
75 from integrations.social.models import get_db
76 from .continual_learner_gate import ContinualLearnerGateService
78 body = request.get_json(silent=True) or {}
79 node_id = body.get('node_id')
80 old_cct = body.get('cct')
81 if not node_id:
82 return jsonify({'success': False, 'error': 'node_id required'}), 400
84 if not _verify_node_signature(body):
85 return jsonify({'success': False, 'error': 'invalid_signature'}), 403
87 db = get_db()
88 try:
89 result = ContinualLearnerGateService.renew_cct(db, node_id, old_cct)
90 if result:
91 db.commit()
92 return jsonify({'success': True, 'data': result}), 200
93 else:
94 return jsonify({
95 'success': False,
96 'error': 'No longer eligible for learning access',
97 }), 403
98 except Exception as e:
99 db.rollback()
100 logger.error(f"CCT renewal error: {e}")
101 return jsonify({'success': False, 'error': str(e)}), 500
102 finally:
103 db.close()
106@learning_bp.route('/api/learning/cct/status', methods=['GET'])
107def cct_status():
108 """Check current node's CCT status."""
109 from integrations.social.models import get_db
110 from .continual_learner_gate import ContinualLearnerGateService
112 node_id = request.args.get('node_id')
113 if not node_id:
114 return jsonify({'success': False, 'error': 'node_id required'}), 400
116 db = get_db()
117 try:
118 tier_info = ContinualLearnerGateService.compute_learning_tier(
119 db, node_id)
121 # Check latest CCT attestation
122 cct_info = {'has_active_cct': False}
123 try:
124 from integrations.social.models import NodeAttestation
125 from sqlalchemy import desc
126 latest = db.query(NodeAttestation).filter_by(
127 subject_node_id=node_id,
128 attestation_type='cct_issued',
129 is_valid=True,
130 ).order_by(desc(NodeAttestation.created_at)).first()
131 if latest:
132 cct_info = {
133 'has_active_cct': True,
134 'issued_at': (latest.created_at.isoformat()
135 if latest.created_at else None),
136 'expires_at': (latest.expires_at.isoformat()
137 if latest.expires_at else None),
138 'tier': (latest.payload_json or {}).get('tier', 'unknown'),
139 }
140 except Exception:
141 pass
143 return jsonify({
144 'success': True,
145 'data': {
146 'tier_info': tier_info,
147 'cct': cct_info,
148 },
149 }), 200
150 except Exception as e:
151 logger.error(f"CCT status error: {e}")
152 return jsonify({'success': False, 'error': str(e)}), 500
153 finally:
154 db.close()
157@learning_bp.route('/api/learning/cct/verify', methods=['POST'])
158def verify_cct():
159 """Verify any CCT (public endpoint — no auth required)."""
160 from .continual_learner_gate import ContinualLearnerGateService
162 body = request.get_json(silent=True) or {}
163 cct = body.get('cct')
164 expected_node = body.get('node_id')
165 if not cct:
166 return jsonify({'success': False, 'error': 'cct required'}), 400
168 result = ContinualLearnerGateService.validate_cct(cct, expected_node)
169 return jsonify({'success': True, 'data': result}), 200
172@learning_bp.route('/api/learning/tiers', methods=['GET'])
173def tier_stats():
174 """Get learning tier distribution across all nodes."""
175 from integrations.social.models import get_db
176 from .continual_learner_gate import ContinualLearnerGateService
178 db = get_db()
179 try:
180 stats = ContinualLearnerGateService.get_learning_tier_stats(db)
181 return jsonify({'success': True, 'data': stats}), 200
182 except Exception as e:
183 logger.error(f"Tier stats error: {e}")
184 return jsonify({'success': False, 'error': str(e)}), 500
185 finally:
186 db.close()
189@learning_bp.route('/api/learning/contributions', methods=['GET'])
190def contribution_leaderboard():
191 """Top compute contributors by contribution_score."""
192 from integrations.social.models import get_db
194 limit = request.args.get('limit', 50, type=int)
195 db = get_db()
196 try:
197 from integrations.social.models import PeerNode
198 from sqlalchemy import desc
199 peers = db.query(PeerNode).filter(
200 PeerNode.status.in_(['active', 'stale']),
201 PeerNode.contribution_score > 0,
202 ).order_by(
203 desc(PeerNode.contribution_score)
204 ).limit(min(limit, 200)).all()
206 leaderboard = []
207 for i, peer in enumerate(peers, 1):
208 leaderboard.append({
209 'rank': i,
210 'node_id': peer.node_id,
211 'contribution_score': round(peer.contribution_score or 0, 2),
212 'capability_tier': peer.capability_tier,
213 'integrity_status': peer.integrity_status,
214 'visibility_tier': peer.visibility_tier,
215 })
217 return jsonify({'success': True, 'data': leaderboard}), 200
218 except Exception as e:
219 logger.error(f"Contribution leaderboard error: {e}")
220 return jsonify({'success': False, 'error': str(e)}), 500
221 finally:
222 db.close()
225@learning_bp.route('/api/learning/benchmark', methods=['POST'])
226def submit_benchmark():
227 """Submit a compute benchmark result for verification."""
228 from integrations.social.models import get_db
229 from .continual_learner_gate import ContinualLearnerGateService
231 body = request.get_json(silent=True) or {}
232 node_id = body.get('node_id')
233 if not node_id:
234 return jsonify({'success': False, 'error': 'node_id required'}), 400
236 if not _verify_node_signature(body):
237 return jsonify({'success': False, 'error': 'invalid_signature'}), 403
239 benchmark_result = {
240 'benchmark_type': body.get('benchmark_type', 'unknown'),
241 'score': body.get('score', 0),
242 'duration_ms': body.get('duration_ms', 0),
243 'hardware_info': body.get('hardware_info', {}),
244 }
246 db = get_db()
247 try:
248 result = ContinualLearnerGateService.verify_compute_contribution(
249 db, node_id, benchmark_result)
250 db.commit()
251 return jsonify({'success': True, 'data': result}), 200
252 except Exception as e:
253 db.rollback()
254 logger.error(f"Benchmark submit error: {e}")
255 return jsonify({'success': False, 'error': str(e)}), 500
256 finally:
257 db.close()
260# ─── Gradient Sync Endpoints ───
262@learning_bp.route('/api/learning/gradient/submit', methods=['POST'])
263def submit_gradient():
264 """Submit a compressed embedding delta for distributed aggregation."""
265 from integrations.social.models import get_db
266 from .gradient_service import GradientSyncService
267 from .embedding_delta import compress_delta
269 body = request.get_json(silent=True) or {}
270 node_id = body.get('node_id')
271 if not node_id:
272 return jsonify({'success': False, 'error': 'node_id required'}), 400
274 if not _verify_node_signature(body):
275 return jsonify({'success': False, 'error': 'invalid_signature'}), 403
277 # Accept pre-compressed delta or raw values
278 delta = body.get('delta')
279 if not delta:
280 raw_values = body.get('values', [])
281 k = body.get('compression_k', 32)
282 if not raw_values:
283 return jsonify({'success': False,
284 'error': 'delta or values required'}), 400
285 delta = compress_delta(raw_values, method='top_k', k=k)
287 cct = body.get('cct')
289 db = get_db()
290 try:
291 result = GradientSyncService.submit_embedding_delta(
292 db, node_id, delta, cct_string=cct)
293 if result.get('accepted'):
294 db.commit()
295 return jsonify({'success': result.get('accepted', False),
296 'data': result}), 200
297 except Exception as e:
298 db.rollback()
299 logger.error(f"Gradient submit error: {e}")
300 return jsonify({'success': False, 'error': str(e)}), 500
301 finally:
302 db.close()
305@learning_bp.route('/api/learning/gradient/status', methods=['GET'])
306def gradient_status():
307 """Get current embedding sync convergence status."""
308 from integrations.social.models import get_db
309 from .gradient_service import GradientSyncService
311 db = get_db()
312 try:
313 status = GradientSyncService.get_convergence_status(db)
314 return jsonify({'success': True, 'data': status}), 200
315 except Exception as e:
316 logger.error(f"Gradient status error: {e}")
317 return jsonify({'success': False, 'error': str(e)}), 500
318 finally:
319 db.close()