Coverage for integrations / agent_engine / embedding_delta.py: 92.9%

141 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Embedding Delta — Compression, validation, aggregation, and anomaly detection. 

3 

4Pure functions operating on numpy arrays. Used by gradient_service.py for 

5distributed embedding synchronization across HART nodes. 

6 

7Phase 1: Embedding sync (compressed representation deltas, <100KB per round). 

8Phase 2: LoRA gradient sync (stubs in federated_gradient_protocol.py). 

9 

10Intelligence is earned through contribution. Every compute cycle donated 

11makes the hive smarter. 90% of value flows back to contributors. 

12""" 

13import logging 

14import math 

15from typing import Dict, List, Optional, Tuple 

16 

17logger = logging.getLogger('hevolve_social') 

18 

19# ─── Constants ─── 

20 

21MAX_DELTA_SIZE_BYTES = 102_400 # 100KB hard cap per delta 

22DEFAULT_TOP_K = 32 # Default number of principal components to keep 

23MIN_PEERS_FOR_AGGREGATION = 1 # Minimum peers needed for trimmed mean 

24ANOMALY_SIGMA = 3.0 # Z-score threshold for magnitude anomaly 

25DIRECTION_FLIP_THRESHOLD = -0.5 # Cosine similarity below this = direction flip 

26MAX_DIMENSION = 8192 # Maximum embedding dimension we accept 

27 

28 

29# ─── Compression ─── 

30 

31def compress_delta(raw_values: list, 

32 method: str = 'top_k', 

33 k: int = DEFAULT_TOP_K) -> Dict: 

34 """Compress an embedding delta for transmission. 

35 

36 Keeps only the top-k components by absolute magnitude. Falls back to 

37 uniform sampling if fewer than k non-zero values exist. 

38 

39 Args: 

40 raw_values: List of float values (embedding delta). 

41 method: Compression method ('top_k' or 'none'). 

42 k: Number of components to keep. 

43 

44 Returns: 

45 {'method': str, 'k': int, 'dimension': int, 

46 'indices': [int], 'values': [float], 'magnitude': float} 

47 """ 

48 if not raw_values: 

49 return {'method': method, 'k': 0, 'dimension': 0, 

50 'indices': [], 'values': [], 'magnitude': 0.0} 

51 

52 dimension = len(raw_values) 

53 if dimension > MAX_DIMENSION: 

54 raw_values = raw_values[:MAX_DIMENSION] 

55 dimension = MAX_DIMENSION 

56 

57 magnitude = _magnitude(raw_values) 

58 

59 if method == 'none' or k >= dimension: 

60 return { 

61 'method': 'none', 'k': dimension, 'dimension': dimension, 

62 'indices': list(range(dimension)), 

63 'values': [round(v, 8) for v in raw_values], 

64 'magnitude': round(magnitude, 8), 

65 } 

66 

67 # Top-k by absolute value 

68 indexed = [(i, v) for i, v in enumerate(raw_values)] 

69 indexed.sort(key=lambda x: abs(x[1]), reverse=True) 

70 top = indexed[:k] 

71 top.sort(key=lambda x: x[0]) # Restore index order 

72 

73 return { 

74 'method': 'top_k', 

75 'k': k, 

76 'dimension': dimension, 

77 'indices': [i for i, _ in top], 

78 'values': [round(v, 8) for _, v in top], 

79 'magnitude': round(magnitude, 8), 

80 } 

81 

82 

83def decompress_delta(compressed: Dict) -> list: 

84 """Reconstruct full-dimension delta from compressed form. 

85 

86 Missing indices are filled with 0.0. 

87 """ 

88 dimension = compressed.get('dimension', 0) 

89 if dimension <= 0: 

90 return [] 

91 

92 result = [0.0] * dimension 

93 indices = compressed.get('indices', []) 

94 values = compressed.get('values', []) 

95 for idx, val in zip(indices, values): 

96 if 0 <= idx < dimension: 

97 result[idx] = val 

98 return result 

99 

100 

101# ─── Validation ─── 

102 

103def validate_delta(delta: Dict) -> Tuple[bool, str]: 

104 """Validate a compressed embedding delta for correctness and size. 

105 

106 Returns: (valid: bool, reason: str) 

107 """ 

108 if not isinstance(delta, dict): 

109 return False, 'not_a_dict' 

110 

111 dimension = delta.get('dimension', 0) 

112 if not isinstance(dimension, int) or dimension <= 0: 

113 return False, 'invalid_dimension' 

114 if dimension > MAX_DIMENSION: 

115 return False, f'dimension_too_large ({dimension} > {MAX_DIMENSION})' 

116 

117 indices = delta.get('indices', []) 

118 values = delta.get('values', []) 

119 if len(indices) != len(values): 

120 return False, 'indices_values_length_mismatch' 

121 

122 # Check indices are valid 

123 for idx in indices: 

124 if not isinstance(idx, int) or idx < 0 or idx >= dimension: 

125 return False, f'invalid_index ({idx})' 

126 

127 # Check for duplicates 

128 if len(set(indices)) != len(indices): 

129 return False, 'duplicate_indices' 

130 

131 # Check values are numeric 

132 for v in values: 

133 if not isinstance(v, (int, float)): 

134 return False, 'non_numeric_value' 

135 if math.isnan(v) or math.isinf(v): 

136 return False, 'nan_or_inf_value' 

137 

138 # Size estimate (rough: 12 bytes per index+value pair + overhead) 

139 estimated_size = len(indices) * 12 + 100 

140 if estimated_size > MAX_DELTA_SIZE_BYTES: 

141 return False, f'estimated_size_too_large ({estimated_size})' 

142 

143 return True, 'ok' 

144 

145 

146# ─── Aggregation ─── 

147 

148def trimmed_mean_aggregate(deltas: List[Dict], 

149 sigma: float = ANOMALY_SIGMA, 

150 weights: Optional[List[float]] = None) -> Dict: 

151 """Aggregate multiple embedding deltas using trimmed mean. 

152 

153 Removes contributions with magnitude > sigma standard deviations from 

154 the mean before averaging. This is Byzantine-resilient for Phase 1. 

155 

156 Args: 

157 deltas: List of compressed delta dicts. 

158 sigma: Z-score threshold for outlier removal. 

159 weights: Optional per-delta weights (e.g., contribution_score). 

160 

161 Returns: 

162 Aggregated compressed delta dict. 

163 """ 

164 if not deltas: 

165 return {'method': 'aggregated', 'k': 0, 'dimension': 0, 

166 'indices': [], 'values': [], 'magnitude': 0.0, 

167 'peer_count': 0, 'outliers_removed': 0} 

168 

169 if len(deltas) == 1: 

170 result = dict(deltas[0]) 

171 result['peer_count'] = 1 

172 result['outliers_removed'] = 0 

173 return result 

174 

175 # Decompress all deltas to full dimension 

176 dimension = max(d.get('dimension', 0) for d in deltas) 

177 if dimension <= 0: 

178 return {'method': 'aggregated', 'k': 0, 'dimension': 0, 

179 'indices': [], 'values': [], 'magnitude': 0.0, 

180 'peer_count': len(deltas), 'outliers_removed': 0} 

181 

182 # Normalize all to same dimension 

183 full_deltas = [] 

184 magnitudes = [] 

185 for d in deltas: 

186 full = decompress_delta(d) 

187 # Pad if needed 

188 if len(full) < dimension: 

189 full.extend([0.0] * (dimension - len(full))) 

190 elif len(full) > dimension: 

191 full = full[:dimension] 

192 full_deltas.append(full) 

193 magnitudes.append(_magnitude(full)) 

194 

195 # Outlier detection on magnitudes 

196 outlier_mask = _detect_outliers(magnitudes, sigma) 

197 outliers_removed = sum(outlier_mask) 

198 

199 # Filter 

200 filtered_deltas = [] 

201 filtered_weights = [] 

202 for i, (fd, is_outlier) in enumerate(zip(full_deltas, outlier_mask)): 

203 if not is_outlier: 

204 filtered_deltas.append(fd) 

205 w = weights[i] if weights and i < len(weights) else 1.0 

206 filtered_weights.append(max(0.01, w)) 

207 

208 if not filtered_deltas: 

209 # All were outliers — fall back to simple mean of all 

210 filtered_deltas = full_deltas 

211 filtered_weights = [1.0] * len(full_deltas) 

212 outliers_removed = 0 

213 

214 # Weighted mean 

215 total_weight = sum(filtered_weights) 

216 aggregated_values = [0.0] * dimension 

217 for fd, w in zip(filtered_deltas, filtered_weights): 

218 for j in range(dimension): 

219 aggregated_values[j] += fd[j] * (w / total_weight) 

220 

221 # Re-compress the result 

222 compressed = compress_delta(aggregated_values, method='top_k', 

223 k=min(DEFAULT_TOP_K, dimension)) 

224 compressed['peer_count'] = len(deltas) 

225 compressed['outliers_removed'] = outliers_removed 

226 return compressed 

227 

228 

229# ─── Anomaly Detection ─── 

230 

231def detect_magnitude_anomaly(magnitude: float, 

232 peer_magnitudes: List[float], 

233 sigma: float = ANOMALY_SIGMA) -> bool: 

234 """Detect if a single delta's magnitude is anomalous vs peer population. 

235 

236 Used by IntegrityService for gradient_magnitude_anomaly fraud signal. 

237 """ 

238 if len(peer_magnitudes) < 2: 

239 return False 

240 

241 mean_m = sum(peer_magnitudes) / len(peer_magnitudes) 

242 variance = sum((m - mean_m) ** 2 for m in peer_magnitudes) / len(peer_magnitudes) 

243 stddev = math.sqrt(variance) if variance > 0 else 0.0 

244 

245 if stddev < 1e-10: 

246 # All magnitudes are essentially equal 

247 return abs(magnitude - mean_m) > 1e-6 

248 

249 z_score = abs(magnitude - mean_m) / stddev 

250 return z_score > sigma 

251 

252 

253def detect_direction_flip(current_values: list, 

254 previous_values: list) -> bool: 

255 """Detect if embedding delta has flipped direction vs previous round. 

256 

257 A direction flip (cosine similarity < -0.5) indicates potential 

258 adversarial gradient manipulation. 

259 """ 

260 if not current_values or not previous_values: 

261 return False 

262 

263 min_len = min(len(current_values), len(previous_values)) 

264 if min_len == 0: 

265 return False 

266 

267 cos_sim = _cosine_similarity( 

268 current_values[:min_len], previous_values[:min_len]) 

269 return cos_sim < DIRECTION_FLIP_THRESHOLD 

270 

271 

272# ─── Internal Helpers ─── 

273 

274def _magnitude(values: list) -> float: 

275 """L2 norm of a vector.""" 

276 return math.sqrt(sum(v * v for v in values)) if values else 0.0 

277 

278 

279def _cosine_similarity(a: list, b: list) -> float: 

280 """Cosine similarity between two vectors.""" 

281 dot = sum(x * y for x, y in zip(a, b)) 

282 norm_a = math.sqrt(sum(x * x for x in a)) 

283 norm_b = math.sqrt(sum(x * x for x in b)) 

284 if norm_a < 1e-10 or norm_b < 1e-10: 

285 return 0.0 

286 return dot / (norm_a * norm_b) 

287 

288 

289def _detect_outliers(values: List[float], sigma: float) -> List[bool]: 

290 """Return boolean mask: True if value is > sigma stddevs from mean.""" 

291 if len(values) < 3: 

292 return [False] * len(values) 

293 

294 mean_v = sum(values) / len(values) 

295 variance = sum((v - mean_v) ** 2 for v in values) / len(values) 

296 stddev = math.sqrt(variance) if variance > 0 else 0.0 

297 

298 if stddev < 1e-10: 

299 return [False] * len(values) 

300 

301 return [abs(v - mean_v) / stddev > sigma for v in values]