Coverage for integrations / agent_engine / embedding_delta.py: 92.9%
141 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Embedding Delta — Compression, validation, aggregation, and anomaly detection.
4Pure functions operating on numpy arrays. Used by gradient_service.py for
5distributed embedding synchronization across HART nodes.
7Phase 1: Embedding sync (compressed representation deltas, <100KB per round).
8Phase 2: LoRA gradient sync (stubs in federated_gradient_protocol.py).
10Intelligence is earned through contribution. Every compute cycle donated
11makes the hive smarter. 90% of value flows back to contributors.
12"""
13import logging
14import math
15from typing import Dict, List, Optional, Tuple
17logger = logging.getLogger('hevolve_social')
19# ─── Constants ───
21MAX_DELTA_SIZE_BYTES = 102_400 # 100KB hard cap per delta
22DEFAULT_TOP_K = 32 # Default number of principal components to keep
23MIN_PEERS_FOR_AGGREGATION = 1 # Minimum peers needed for trimmed mean
24ANOMALY_SIGMA = 3.0 # Z-score threshold for magnitude anomaly
25DIRECTION_FLIP_THRESHOLD = -0.5 # Cosine similarity below this = direction flip
26MAX_DIMENSION = 8192 # Maximum embedding dimension we accept
29# ─── Compression ───
31def compress_delta(raw_values: list,
32 method: str = 'top_k',
33 k: int = DEFAULT_TOP_K) -> Dict:
34 """Compress an embedding delta for transmission.
36 Keeps only the top-k components by absolute magnitude. Falls back to
37 uniform sampling if fewer than k non-zero values exist.
39 Args:
40 raw_values: List of float values (embedding delta).
41 method: Compression method ('top_k' or 'none').
42 k: Number of components to keep.
44 Returns:
45 {'method': str, 'k': int, 'dimension': int,
46 'indices': [int], 'values': [float], 'magnitude': float}
47 """
48 if not raw_values:
49 return {'method': method, 'k': 0, 'dimension': 0,
50 'indices': [], 'values': [], 'magnitude': 0.0}
52 dimension = len(raw_values)
53 if dimension > MAX_DIMENSION:
54 raw_values = raw_values[:MAX_DIMENSION]
55 dimension = MAX_DIMENSION
57 magnitude = _magnitude(raw_values)
59 if method == 'none' or k >= dimension:
60 return {
61 'method': 'none', 'k': dimension, 'dimension': dimension,
62 'indices': list(range(dimension)),
63 'values': [round(v, 8) for v in raw_values],
64 'magnitude': round(magnitude, 8),
65 }
67 # Top-k by absolute value
68 indexed = [(i, v) for i, v in enumerate(raw_values)]
69 indexed.sort(key=lambda x: abs(x[1]), reverse=True)
70 top = indexed[:k]
71 top.sort(key=lambda x: x[0]) # Restore index order
73 return {
74 'method': 'top_k',
75 'k': k,
76 'dimension': dimension,
77 'indices': [i for i, _ in top],
78 'values': [round(v, 8) for _, v in top],
79 'magnitude': round(magnitude, 8),
80 }
83def decompress_delta(compressed: Dict) -> list:
84 """Reconstruct full-dimension delta from compressed form.
86 Missing indices are filled with 0.0.
87 """
88 dimension = compressed.get('dimension', 0)
89 if dimension <= 0:
90 return []
92 result = [0.0] * dimension
93 indices = compressed.get('indices', [])
94 values = compressed.get('values', [])
95 for idx, val in zip(indices, values):
96 if 0 <= idx < dimension:
97 result[idx] = val
98 return result
101# ─── Validation ───
103def validate_delta(delta: Dict) -> Tuple[bool, str]:
104 """Validate a compressed embedding delta for correctness and size.
106 Returns: (valid: bool, reason: str)
107 """
108 if not isinstance(delta, dict):
109 return False, 'not_a_dict'
111 dimension = delta.get('dimension', 0)
112 if not isinstance(dimension, int) or dimension <= 0:
113 return False, 'invalid_dimension'
114 if dimension > MAX_DIMENSION:
115 return False, f'dimension_too_large ({dimension} > {MAX_DIMENSION})'
117 indices = delta.get('indices', [])
118 values = delta.get('values', [])
119 if len(indices) != len(values):
120 return False, 'indices_values_length_mismatch'
122 # Check indices are valid
123 for idx in indices:
124 if not isinstance(idx, int) or idx < 0 or idx >= dimension:
125 return False, f'invalid_index ({idx})'
127 # Check for duplicates
128 if len(set(indices)) != len(indices):
129 return False, 'duplicate_indices'
131 # Check values are numeric
132 for v in values:
133 if not isinstance(v, (int, float)):
134 return False, 'non_numeric_value'
135 if math.isnan(v) or math.isinf(v):
136 return False, 'nan_or_inf_value'
138 # Size estimate (rough: 12 bytes per index+value pair + overhead)
139 estimated_size = len(indices) * 12 + 100
140 if estimated_size > MAX_DELTA_SIZE_BYTES:
141 return False, f'estimated_size_too_large ({estimated_size})'
143 return True, 'ok'
146# ─── Aggregation ───
148def trimmed_mean_aggregate(deltas: List[Dict],
149 sigma: float = ANOMALY_SIGMA,
150 weights: Optional[List[float]] = None) -> Dict:
151 """Aggregate multiple embedding deltas using trimmed mean.
153 Removes contributions with magnitude > sigma standard deviations from
154 the mean before averaging. This is Byzantine-resilient for Phase 1.
156 Args:
157 deltas: List of compressed delta dicts.
158 sigma: Z-score threshold for outlier removal.
159 weights: Optional per-delta weights (e.g., contribution_score).
161 Returns:
162 Aggregated compressed delta dict.
163 """
164 if not deltas:
165 return {'method': 'aggregated', 'k': 0, 'dimension': 0,
166 'indices': [], 'values': [], 'magnitude': 0.0,
167 'peer_count': 0, 'outliers_removed': 0}
169 if len(deltas) == 1:
170 result = dict(deltas[0])
171 result['peer_count'] = 1
172 result['outliers_removed'] = 0
173 return result
175 # Decompress all deltas to full dimension
176 dimension = max(d.get('dimension', 0) for d in deltas)
177 if dimension <= 0:
178 return {'method': 'aggregated', 'k': 0, 'dimension': 0,
179 'indices': [], 'values': [], 'magnitude': 0.0,
180 'peer_count': len(deltas), 'outliers_removed': 0}
182 # Normalize all to same dimension
183 full_deltas = []
184 magnitudes = []
185 for d in deltas:
186 full = decompress_delta(d)
187 # Pad if needed
188 if len(full) < dimension:
189 full.extend([0.0] * (dimension - len(full)))
190 elif len(full) > dimension:
191 full = full[:dimension]
192 full_deltas.append(full)
193 magnitudes.append(_magnitude(full))
195 # Outlier detection on magnitudes
196 outlier_mask = _detect_outliers(magnitudes, sigma)
197 outliers_removed = sum(outlier_mask)
199 # Filter
200 filtered_deltas = []
201 filtered_weights = []
202 for i, (fd, is_outlier) in enumerate(zip(full_deltas, outlier_mask)):
203 if not is_outlier:
204 filtered_deltas.append(fd)
205 w = weights[i] if weights and i < len(weights) else 1.0
206 filtered_weights.append(max(0.01, w))
208 if not filtered_deltas:
209 # All were outliers — fall back to simple mean of all
210 filtered_deltas = full_deltas
211 filtered_weights = [1.0] * len(full_deltas)
212 outliers_removed = 0
214 # Weighted mean
215 total_weight = sum(filtered_weights)
216 aggregated_values = [0.0] * dimension
217 for fd, w in zip(filtered_deltas, filtered_weights):
218 for j in range(dimension):
219 aggregated_values[j] += fd[j] * (w / total_weight)
221 # Re-compress the result
222 compressed = compress_delta(aggregated_values, method='top_k',
223 k=min(DEFAULT_TOP_K, dimension))
224 compressed['peer_count'] = len(deltas)
225 compressed['outliers_removed'] = outliers_removed
226 return compressed
229# ─── Anomaly Detection ───
231def detect_magnitude_anomaly(magnitude: float,
232 peer_magnitudes: List[float],
233 sigma: float = ANOMALY_SIGMA) -> bool:
234 """Detect if a single delta's magnitude is anomalous vs peer population.
236 Used by IntegrityService for gradient_magnitude_anomaly fraud signal.
237 """
238 if len(peer_magnitudes) < 2:
239 return False
241 mean_m = sum(peer_magnitudes) / len(peer_magnitudes)
242 variance = sum((m - mean_m) ** 2 for m in peer_magnitudes) / len(peer_magnitudes)
243 stddev = math.sqrt(variance) if variance > 0 else 0.0
245 if stddev < 1e-10:
246 # All magnitudes are essentially equal
247 return abs(magnitude - mean_m) > 1e-6
249 z_score = abs(magnitude - mean_m) / stddev
250 return z_score > sigma
253def detect_direction_flip(current_values: list,
254 previous_values: list) -> bool:
255 """Detect if embedding delta has flipped direction vs previous round.
257 A direction flip (cosine similarity < -0.5) indicates potential
258 adversarial gradient manipulation.
259 """
260 if not current_values or not previous_values:
261 return False
263 min_len = min(len(current_values), len(previous_values))
264 if min_len == 0:
265 return False
267 cos_sim = _cosine_similarity(
268 current_values[:min_len], previous_values[:min_len])
269 return cos_sim < DIRECTION_FLIP_THRESHOLD
272# ─── Internal Helpers ───
274def _magnitude(values: list) -> float:
275 """L2 norm of a vector."""
276 return math.sqrt(sum(v * v for v in values)) if values else 0.0
279def _cosine_similarity(a: list, b: list) -> float:
280 """Cosine similarity between two vectors."""
281 dot = sum(x * y for x, y in zip(a, b))
282 norm_a = math.sqrt(sum(x * x for x in a))
283 norm_b = math.sqrt(sum(x * x for x in b))
284 if norm_a < 1e-10 or norm_b < 1e-10:
285 return 0.0
286 return dot / (norm_a * norm_b)
289def _detect_outliers(values: List[float], sigma: float) -> List[bool]:
290 """Return boolean mask: True if value is > sigma stddevs from mean."""
291 if len(values) < 3:
292 return [False] * len(values)
294 mean_v = sum(values) / len(values)
295 variance = sum((v - mean_v) ** 2 for v in values) / len(values)
296 stddev = math.sqrt(variance) if variance > 0 else 0.0
298 if stddev < 1e-10:
299 return [False] * len(values)
301 return [abs(v - mean_v) / stddev > sigma for v in values]