Coverage for integrations / agent_engine / learning_tools.py: 56.2%

80 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Continual Learning Agent Tools — AutoGen tools for learning coordination. 

3 

4Handles: CCT issuance/management, compute contribution verification, 

5learning tier monitoring, skill distribution gating. 

6 

7Intelligence is earned through contribution. Every compute cycle donated 

8makes the hive smarter. 90% of value flows back to contributors. 

9 

10Tier 2 tools (agent_engine context). Same pattern as content_gen_tools.py. 

11""" 

12import json 

13import logging 

14 

15logger = logging.getLogger('hevolve_social') 

16 

17 

18def check_learning_health() -> str: 

19 """Check the continual learning pipeline health: tier distribution, CCT stats, bridge status.""" 

20 try: 

21 from integrations.social.models import get_db 

22 from .continual_learner_gate import ContinualLearnerGateService 

23 

24 db = get_db() 

25 try: 

26 tier_stats = ContinualLearnerGateService.get_learning_tier_stats(db) 

27 

28 # Check WorldModelBridge health 

29 bridge_health = {'healthy': False, 'mode': 'unknown'} 

30 try: 

31 from .world_model_bridge import get_world_model_bridge 

32 bridge = get_world_model_bridge() 

33 if bridge: 

34 bridge_health = bridge.check_health() 

35 except Exception: 

36 pass 

37 

38 return json.dumps({ 

39 'learning_health': { 

40 'status': 'healthy' if tier_stats.get('eligible_nodes', 0) > 0 else 'bootstrapping', 

41 'tier_distribution': tier_stats.get('tiers', {}), 

42 'total_nodes': tier_stats.get('total_nodes', 0), 

43 'eligible_for_learning': tier_stats.get('eligible_nodes', 0), 

44 'total_contribution_score': tier_stats.get('total_contribution_score', 0), 

45 'world_model_bridge': bridge_health, 

46 }, 

47 }) 

48 finally: 

49 db.close() 

50 except Exception as e: 

51 return json.dumps({'error': str(e)}) 

52 

53 

54def verify_compute_contribution(node_id: str, 

55 benchmark_type: str = 'credit_assignment', 

56 score: float = 0.0, 

57 duration_ms: float = 0.0) -> str: 

58 """Verify a node's compute contribution via microbenchmark result.""" 

59 try: 

60 from integrations.social.models import get_db 

61 from .continual_learner_gate import ContinualLearnerGateService 

62 

63 db = get_db() 

64 try: 

65 result = ContinualLearnerGateService.verify_compute_contribution( 

66 db, node_id, { 

67 'benchmark_type': benchmark_type, 

68 'score': score, 

69 'duration_ms': duration_ms, 

70 }) 

71 db.commit() 

72 return json.dumps(result) 

73 finally: 

74 db.close() 

75 except Exception as e: 

76 return json.dumps({'error': str(e)}) 

77 

78 

79def issue_cct(node_id: str) -> str: 

80 """Issue a Compute Contribution Token for an eligible node.""" 

81 try: 

82 from integrations.social.models import get_db 

83 from .continual_learner_gate import ContinualLearnerGateService 

84 

85 db = get_db() 

86 try: 

87 result = ContinualLearnerGateService.issue_cct(db, node_id) 

88 if result: 

89 # Save CCT locally if this is our node 

90 ContinualLearnerGateService.save_cct_to_file(result['cct']) 

91 db.commit() 

92 return json.dumps({ 

93 'success': True, 

94 'tier': result['tier'], 

95 'capabilities': result['capabilities'], 

96 'expires_at': result['expires_at'], 

97 }) 

98 else: 

99 return json.dumps({ 

100 'success': False, 

101 'reason': 'Node not eligible for learning access', 

102 }) 

103 finally: 

104 db.close() 

105 except Exception as e: 

106 return json.dumps({'error': str(e)}) 

107 

108 

109def get_learning_tier_stats() -> str: 

110 """Get aggregate learning tier statistics across all nodes.""" 

111 try: 

112 from integrations.social.models import get_db 

113 from .continual_learner_gate import ContinualLearnerGateService 

114 

115 db = get_db() 

116 try: 

117 stats = ContinualLearnerGateService.get_learning_tier_stats(db) 

118 return json.dumps(stats) 

119 finally: 

120 db.close() 

121 except Exception as e: 

122 return json.dumps({'error': str(e)}) 

123 

124 

125def distribute_learning_skill(skill_description: str = '', 

126 target_tier: str = 'basic') -> str: 

127 """Distribute a learned skill packet to eligible nodes via RALT.""" 

128 try: 

129 from .world_model_bridge import get_world_model_bridge 

130 from .continual_learner_gate import LEARNING_ACCESS_MATRIX 

131 

132 bridge = get_world_model_bridge() 

133 if not bridge: 

134 return json.dumps({'error': 'WorldModelBridge unavailable'}) 

135 

136 # Only distribute to nodes with skill_distribution capability 

137 if 'skill_distribution' not in LEARNING_ACCESS_MATRIX.get(target_tier, []): 

138 return json.dumps({ 

139 'error': f'Tier {target_tier} does not have skill_distribution capability', 

140 'minimum_tier': 'host', 

141 }) 

142 

143 ralt_packet = { 

144 'skill_type': 'continual_learning', 

145 'description': skill_description, 

146 'target_tier': target_tier, 

147 'distributed_at': __import__('datetime').datetime.utcnow().isoformat(), 

148 } 

149 

150 try: 

151 from security.node_integrity import get_node_identity 

152 identity = get_node_identity() 

153 result = bridge.distribute_skill_packet( 

154 ralt_packet, identity.get('node_id', 'self')) 

155 return json.dumps(result) 

156 except Exception as e: 

157 return json.dumps({'error': str(e)}) 

158 

159 except Exception as e: 

160 return json.dumps({'error': str(e)}) 

161 

162 

163def get_node_learning_status(node_id: str) -> str: 

164 """Get a specific node's learning tier, CCT status, and contribution score.""" 

165 try: 

166 from integrations.social.models import get_db 

167 from .continual_learner_gate import ContinualLearnerGateService 

168 

169 db = get_db() 

170 try: 

171 tier_info = ContinualLearnerGateService.compute_learning_tier( 

172 db, node_id) 

173 

174 # Check if node has a valid CCT attestation 

175 try: 

176 from integrations.social.models import NodeAttestation 

177 from sqlalchemy import desc 

178 latest_cct = db.query(NodeAttestation).filter_by( 

179 subject_node_id=node_id, 

180 attestation_type='cct_issued', 

181 is_valid=True, 

182 ).order_by(desc(NodeAttestation.created_at)).first() 

183 

184 cct_status = { 

185 'has_active_cct': latest_cct is not None, 

186 'cct_expires': (latest_cct.expires_at.isoformat() 

187 if latest_cct and latest_cct.expires_at 

188 else None), 

189 } 

190 except Exception: 

191 cct_status = {'has_active_cct': False} 

192 

193 return json.dumps({ 

194 'node_id': node_id, 

195 'learning_tier': tier_info, 

196 'cct_status': cct_status, 

197 }) 

198 finally: 

199 db.close() 

200 except Exception as e: 

201 return json.dumps({'error': str(e)}) 

202 

203 

204# Tool registration for ServiceToolRegistry 

205LEARNING_TOOLS = [ 

206 { 

207 'name': 'check_learning_health', 

208 'func': check_learning_health, 

209 'description': 'Check continual learning pipeline health and tier distribution', 

210 'tags': ['learning'], 

211 }, 

212 { 

213 'name': 'verify_compute_contribution', 

214 'func': verify_compute_contribution, 

215 'description': 'Verify a node compute contribution via microbenchmark', 

216 'tags': ['learning'], 

217 }, 

218 { 

219 'name': 'issue_cct', 

220 'func': issue_cct, 

221 'description': 'Issue a Compute Contribution Token for an eligible node', 

222 'tags': ['learning'], 

223 }, 

224 { 

225 'name': 'get_learning_tier_stats', 

226 'func': get_learning_tier_stats, 

227 'description': 'Get aggregate learning tier statistics across all nodes', 

228 'tags': ['learning'], 

229 }, 

230 { 

231 'name': 'distribute_learning_skill', 

232 'func': distribute_learning_skill, 

233 'description': 'Distribute a learned skill packet to eligible nodes', 

234 'tags': ['learning'], 

235 }, 

236 { 

237 'name': 'get_node_learning_status', 

238 'func': get_node_learning_status, 

239 'description': 'Get a node learning tier, CCT status, and contribution score', 

240 'tags': ['learning'], 

241 }, 

242]