Coverage for integrations / agent_engine / api_learning.py: 41.7%

156 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Continual Learning API Blueprint — CCT management and learning tier endpoints. 

3 

4POST /api/learning/cct/request — Node requests initial CCT 

5POST /api/learning/cct/renew — Renew expiring CCT 

6GET /api/learning/cct/status — Check current CCT status 

7POST /api/learning/cct/verify — Verify any CCT (public) 

8GET /api/learning/tiers — Tier distribution stats 

9GET /api/learning/contributions — Contribution leaderboard 

10POST /api/learning/benchmark — Submit compute benchmark 

11""" 

12import logging 

13 

14from flask import Blueprint, jsonify, request 

15 

16logger = logging.getLogger('hevolve_social') 

17 

18learning_bp = Blueprint('learning', __name__) 

19 

20 

21def _verify_node_signature(body: dict) -> bool: 

22 """Verify Ed25519 signature on request body (same as gossip announce).""" 

23 try: 

24 node_id = body.get('node_id', '') 

25 signature = body.get('signature', '') 

26 public_key = body.get('public_key', '') 

27 if not all([node_id, signature, public_key]): 

28 return False 

29 from security.node_integrity import verify_json_signature 

30 payload = {k: v for k, v in body.items() if k != 'signature'} 

31 return verify_json_signature(public_key, payload, signature) 

32 except Exception: 

33 return False 

34 

35 

36@learning_bp.route('/api/learning/cct/request', methods=['POST']) 

37def request_cct(): 

38 """Node requests a new Compute Contribution Token.""" 

39 from integrations.social.models import get_db 

40 from .continual_learner_gate import ContinualLearnerGateService 

41 

42 body = request.get_json(silent=True) or {} 

43 node_id = body.get('node_id') 

44 if not node_id: 

45 return jsonify({'success': False, 'error': 'node_id required'}), 400 

46 

47 if not _verify_node_signature(body): 

48 return jsonify({'success': False, 'error': 'invalid_signature'}), 403 

49 

50 db = get_db() 

51 try: 

52 result = ContinualLearnerGateService.issue_cct(db, node_id) 

53 if result: 

54 db.commit() 

55 return jsonify({'success': True, 'data': result}), 200 

56 else: 

57 tier_info = ContinualLearnerGateService.compute_learning_tier( 

58 db, node_id) 

59 return jsonify({ 

60 'success': False, 

61 'error': 'Not eligible for learning access', 

62 'tier_info': tier_info, 

63 }), 403 

64 except Exception as e: 

65 db.rollback() 

66 logger.error(f"CCT request error: {e}") 

67 return jsonify({'success': False, 'error': str(e)}), 500 

68 finally: 

69 db.close() 

70 

71 

72@learning_bp.route('/api/learning/cct/renew', methods=['POST']) 

73def renew_cct(): 

74 """Renew an existing CCT. Re-validates eligibility.""" 

75 from integrations.social.models import get_db 

76 from .continual_learner_gate import ContinualLearnerGateService 

77 

78 body = request.get_json(silent=True) or {} 

79 node_id = body.get('node_id') 

80 old_cct = body.get('cct') 

81 if not node_id: 

82 return jsonify({'success': False, 'error': 'node_id required'}), 400 

83 

84 if not _verify_node_signature(body): 

85 return jsonify({'success': False, 'error': 'invalid_signature'}), 403 

86 

87 db = get_db() 

88 try: 

89 result = ContinualLearnerGateService.renew_cct(db, node_id, old_cct) 

90 if result: 

91 db.commit() 

92 return jsonify({'success': True, 'data': result}), 200 

93 else: 

94 return jsonify({ 

95 'success': False, 

96 'error': 'No longer eligible for learning access', 

97 }), 403 

98 except Exception as e: 

99 db.rollback() 

100 logger.error(f"CCT renewal error: {e}") 

101 return jsonify({'success': False, 'error': str(e)}), 500 

102 finally: 

103 db.close() 

104 

105 

106@learning_bp.route('/api/learning/cct/status', methods=['GET']) 

107def cct_status(): 

108 """Check current node's CCT status.""" 

109 from integrations.social.models import get_db 

110 from .continual_learner_gate import ContinualLearnerGateService 

111 

112 node_id = request.args.get('node_id') 

113 if not node_id: 

114 return jsonify({'success': False, 'error': 'node_id required'}), 400 

115 

116 db = get_db() 

117 try: 

118 tier_info = ContinualLearnerGateService.compute_learning_tier( 

119 db, node_id) 

120 

121 # Check latest CCT attestation 

122 cct_info = {'has_active_cct': False} 

123 try: 

124 from integrations.social.models import NodeAttestation 

125 from sqlalchemy import desc 

126 latest = db.query(NodeAttestation).filter_by( 

127 subject_node_id=node_id, 

128 attestation_type='cct_issued', 

129 is_valid=True, 

130 ).order_by(desc(NodeAttestation.created_at)).first() 

131 if latest: 

132 cct_info = { 

133 'has_active_cct': True, 

134 'issued_at': (latest.created_at.isoformat() 

135 if latest.created_at else None), 

136 'expires_at': (latest.expires_at.isoformat() 

137 if latest.expires_at else None), 

138 'tier': (latest.payload_json or {}).get('tier', 'unknown'), 

139 } 

140 except Exception: 

141 pass 

142 

143 return jsonify({ 

144 'success': True, 

145 'data': { 

146 'tier_info': tier_info, 

147 'cct': cct_info, 

148 }, 

149 }), 200 

150 except Exception as e: 

151 logger.error(f"CCT status error: {e}") 

152 return jsonify({'success': False, 'error': str(e)}), 500 

153 finally: 

154 db.close() 

155 

156 

157@learning_bp.route('/api/learning/cct/verify', methods=['POST']) 

158def verify_cct(): 

159 """Verify any CCT (public endpoint — no auth required).""" 

160 from .continual_learner_gate import ContinualLearnerGateService 

161 

162 body = request.get_json(silent=True) or {} 

163 cct = body.get('cct') 

164 expected_node = body.get('node_id') 

165 if not cct: 

166 return jsonify({'success': False, 'error': 'cct required'}), 400 

167 

168 result = ContinualLearnerGateService.validate_cct(cct, expected_node) 

169 return jsonify({'success': True, 'data': result}), 200 

170 

171 

172@learning_bp.route('/api/learning/tiers', methods=['GET']) 

173def tier_stats(): 

174 """Get learning tier distribution across all nodes.""" 

175 from integrations.social.models import get_db 

176 from .continual_learner_gate import ContinualLearnerGateService 

177 

178 db = get_db() 

179 try: 

180 stats = ContinualLearnerGateService.get_learning_tier_stats(db) 

181 return jsonify({'success': True, 'data': stats}), 200 

182 except Exception as e: 

183 logger.error(f"Tier stats error: {e}") 

184 return jsonify({'success': False, 'error': str(e)}), 500 

185 finally: 

186 db.close() 

187 

188 

189@learning_bp.route('/api/learning/contributions', methods=['GET']) 

190def contribution_leaderboard(): 

191 """Top compute contributors by contribution_score.""" 

192 from integrations.social.models import get_db 

193 

194 limit = request.args.get('limit', 50, type=int) 

195 db = get_db() 

196 try: 

197 from integrations.social.models import PeerNode 

198 from sqlalchemy import desc 

199 peers = db.query(PeerNode).filter( 

200 PeerNode.status.in_(['active', 'stale']), 

201 PeerNode.contribution_score > 0, 

202 ).order_by( 

203 desc(PeerNode.contribution_score) 

204 ).limit(min(limit, 200)).all() 

205 

206 leaderboard = [] 

207 for i, peer in enumerate(peers, 1): 

208 leaderboard.append({ 

209 'rank': i, 

210 'node_id': peer.node_id, 

211 'contribution_score': round(peer.contribution_score or 0, 2), 

212 'capability_tier': peer.capability_tier, 

213 'integrity_status': peer.integrity_status, 

214 'visibility_tier': peer.visibility_tier, 

215 }) 

216 

217 return jsonify({'success': True, 'data': leaderboard}), 200 

218 except Exception as e: 

219 logger.error(f"Contribution leaderboard error: {e}") 

220 return jsonify({'success': False, 'error': str(e)}), 500 

221 finally: 

222 db.close() 

223 

224 

225@learning_bp.route('/api/learning/benchmark', methods=['POST']) 

226def submit_benchmark(): 

227 """Submit a compute benchmark result for verification.""" 

228 from integrations.social.models import get_db 

229 from .continual_learner_gate import ContinualLearnerGateService 

230 

231 body = request.get_json(silent=True) or {} 

232 node_id = body.get('node_id') 

233 if not node_id: 

234 return jsonify({'success': False, 'error': 'node_id required'}), 400 

235 

236 if not _verify_node_signature(body): 

237 return jsonify({'success': False, 'error': 'invalid_signature'}), 403 

238 

239 benchmark_result = { 

240 'benchmark_type': body.get('benchmark_type', 'unknown'), 

241 'score': body.get('score', 0), 

242 'duration_ms': body.get('duration_ms', 0), 

243 'hardware_info': body.get('hardware_info', {}), 

244 } 

245 

246 db = get_db() 

247 try: 

248 result = ContinualLearnerGateService.verify_compute_contribution( 

249 db, node_id, benchmark_result) 

250 db.commit() 

251 return jsonify({'success': True, 'data': result}), 200 

252 except Exception as e: 

253 db.rollback() 

254 logger.error(f"Benchmark submit error: {e}") 

255 return jsonify({'success': False, 'error': str(e)}), 500 

256 finally: 

257 db.close() 

258 

259 

260# ─── Gradient Sync Endpoints ─── 

261 

262@learning_bp.route('/api/learning/gradient/submit', methods=['POST']) 

263def submit_gradient(): 

264 """Submit a compressed embedding delta for distributed aggregation.""" 

265 from integrations.social.models import get_db 

266 from .gradient_service import GradientSyncService 

267 from .embedding_delta import compress_delta 

268 

269 body = request.get_json(silent=True) or {} 

270 node_id = body.get('node_id') 

271 if not node_id: 

272 return jsonify({'success': False, 'error': 'node_id required'}), 400 

273 

274 if not _verify_node_signature(body): 

275 return jsonify({'success': False, 'error': 'invalid_signature'}), 403 

276 

277 # Accept pre-compressed delta or raw values 

278 delta = body.get('delta') 

279 if not delta: 

280 raw_values = body.get('values', []) 

281 k = body.get('compression_k', 32) 

282 if not raw_values: 

283 return jsonify({'success': False, 

284 'error': 'delta or values required'}), 400 

285 delta = compress_delta(raw_values, method='top_k', k=k) 

286 

287 cct = body.get('cct') 

288 

289 db = get_db() 

290 try: 

291 result = GradientSyncService.submit_embedding_delta( 

292 db, node_id, delta, cct_string=cct) 

293 if result.get('accepted'): 

294 db.commit() 

295 return jsonify({'success': result.get('accepted', False), 

296 'data': result}), 200 

297 except Exception as e: 

298 db.rollback() 

299 logger.error(f"Gradient submit error: {e}") 

300 return jsonify({'success': False, 'error': str(e)}), 500 

301 finally: 

302 db.close() 

303 

304 

305@learning_bp.route('/api/learning/gradient/status', methods=['GET']) 

306def gradient_status(): 

307 """Get current embedding sync convergence status.""" 

308 from integrations.social.models import get_db 

309 from .gradient_service import GradientSyncService 

310 

311 db = get_db() 

312 try: 

313 status = GradientSyncService.get_convergence_status(db) 

314 return jsonify({'success': True, 'data': status}), 200 

315 except Exception as e: 

316 logger.error(f"Gradient status error: {e}") 

317 return jsonify({'success': False, 'error': str(e)}), 500 

318 finally: 

319 db.close()