Coverage for integrations / agent_engine / gradient_tools.py: 51.0%

49 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Gradient Sync Agent Tools — AutoGen tools for distributed embedding sync. 

3 

44 Phase 1 tools + 2 Phase 2 stubs for LoRA gradient sync. 

5Tier 2 tools (agent_engine context). Same pattern as learning_tools.py. 

6 

7Intelligence is earned through contribution. Every compute cycle donated 

8makes the hive smarter. 

9""" 

10import json 

11import logging 

12 

13logger = logging.getLogger('hevolve_social') 

14 

15 

16def submit_embedding_delta(node_id: str, 

17 values: str = '[]', 

18 dimension: int = 64, 

19 compression_k: int = 32) -> str: 

20 """Submit a compressed embedding delta for distributed aggregation. 

21 

22 Args: 

23 node_id: The submitting node's ID. 

24 values: JSON-encoded list of float values (the raw delta). 

25 dimension: Embedding dimension. 

26 compression_k: Number of top-k components to keep. 

27 """ 

28 try: 

29 from integrations.social.models import get_db 

30 from .gradient_service import GradientSyncService 

31 from .embedding_delta import compress_delta 

32 

33 raw_values = json.loads(values) if isinstance(values, str) else values 

34 if not isinstance(raw_values, list): 

35 return json.dumps({'error': 'values must be a JSON list of floats'}) 

36 

37 delta = compress_delta(raw_values, method='top_k', k=compression_k) 

38 

39 db = get_db() 

40 try: 

41 result = GradientSyncService.submit_embedding_delta( 

42 db, node_id, delta) 

43 if result.get('accepted'): 

44 db.commit() 

45 return json.dumps(result) 

46 finally: 

47 db.close() 

48 except Exception as e: 

49 return json.dumps({'error': str(e)}) 

50 

51 

52def get_gradient_sync_status() -> str: 

53 """Get the current distributed embedding sync convergence status.""" 

54 try: 

55 from integrations.social.models import get_db 

56 from .gradient_service import GradientSyncService 

57 

58 db = get_db() 

59 try: 

60 status = GradientSyncService.get_convergence_status(db) 

61 return json.dumps(status) 

62 finally: 

63 db.close() 

64 except Exception as e: 

65 return json.dumps({'error': str(e)}) 

66 

67 

68def request_embedding_witnesses(node_id: str, delta_json: str = '{}') -> str: 

69 """Request peer witnesses for an embedding delta submission.""" 

70 try: 

71 from integrations.social.models import get_db 

72 from .gradient_service import GradientSyncService 

73 

74 delta = json.loads(delta_json) if isinstance(delta_json, str) else delta_json 

75 

76 db = get_db() 

77 try: 

78 result = GradientSyncService.request_embedding_witnesses( 

79 db, delta, node_id) 

80 return json.dumps(result) 

81 finally: 

82 db.close() 

83 except Exception as e: 

84 return json.dumps({'error': str(e)}) 

85 

86 

87def trigger_embedding_aggregation() -> str: 

88 """Manually trigger embedding delta aggregation round.""" 

89 try: 

90 from .federated_aggregator import get_federated_aggregator 

91 

92 aggregator = get_federated_aggregator() 

93 result = aggregator.embedding_tick() 

94 return json.dumps(result) 

95 except Exception as e: 

96 return json.dumps({'error': str(e)}) 

97 

98 

99# ─── Phase 2 Stubs: LoRA Gradient Sync ─── 

100 

101def submit_lora_gradient(node_id: str, 

102 layer_name: str = '', 

103 gradient_json: str = '{}') -> str: 

104 """[Phase 2 Stub] Submit a LoRA gradient for federated aggregation. 

105 

106 LoRA gradients are sparse, rank-4, ~4KB/layer. Byzantine-resilient 

107 aggregation via Krum or coordinate-wise median. 

108 

109 Not yet implemented — returns stub response. 

110 """ 

111 return json.dumps({ 

112 'accepted': False, 

113 'reason': 'phase2_not_implemented', 

114 'description': 'LoRA gradient sync is Phase 2. ' 

115 'Use submit_embedding_delta for Phase 1 embedding sync.', 

116 }) 

117 

118 

119def get_byzantine_aggregation_status() -> str: 

120 """[Phase 2 Stub] Get Byzantine-resilient aggregation status. 

121 

122 Will use Krum or coordinate-wise median for LoRA gradient aggregation. 

123 

124 Not yet implemented — returns stub response. 

125 """ 

126 return json.dumps({ 

127 'status': 'not_implemented', 

128 'phase': 2, 

129 'description': 'Byzantine aggregation will be available in Phase 2. ' 

130 'Phase 1 uses trimmed-mean for embedding sync.', 

131 }) 

132 

133 

134# ─── Tool Registration ─── 

135 

136GRADIENT_TOOLS = [ 

137 { 

138 'name': 'submit_embedding_delta', 

139 'func': submit_embedding_delta, 

140 'description': 'Submit a compressed embedding delta for distributed aggregation', 

141 'tags': ['gradient_sync'], 

142 }, 

143 { 

144 'name': 'get_gradient_sync_status', 

145 'func': get_gradient_sync_status, 

146 'description': 'Get distributed embedding sync convergence status', 

147 'tags': ['gradient_sync'], 

148 }, 

149 { 

150 'name': 'request_embedding_witnesses', 

151 'func': request_embedding_witnesses, 

152 'description': 'Request peer witnesses for an embedding delta', 

153 'tags': ['gradient_sync'], 

154 }, 

155 { 

156 'name': 'trigger_embedding_aggregation', 

157 'func': trigger_embedding_aggregation, 

158 'description': 'Manually trigger embedding delta aggregation round', 

159 'tags': ['gradient_sync'], 

160 }, 

161 { 

162 'name': 'submit_lora_gradient', 

163 'func': submit_lora_gradient, 

164 'description': '[Phase 2] Submit LoRA gradient for federated aggregation', 

165 'tags': ['gradient_sync'], 

166 }, 

167 { 

168 'name': 'get_byzantine_aggregation_status', 

169 'func': get_byzantine_aggregation_status, 

170 'description': '[Phase 2] Get Byzantine-resilient aggregation status', 

171 'tags': ['gradient_sync'], 

172 }, 

173]