Coverage for integrations / agent_engine / federated_aggregator.py: 83.0%

569 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Unified Agent Goal Engine - Federated Learning Delta Aggregation 

3 

4Periodic aggregation of learning metrics across HART nodes via gossip. 

5Complementary to HiveMind's inference-time tensor fusion — this handles 

6training-time metric synchronization. 

7 

8Lifecycle (driven by AgentDaemon._tick every 2nd tick): 

9 1. extract_local_delta() — pull metrics from WorldModelBridge 

10 2. broadcast_delta() — sign + POST to peers 

11 3. receive_peer_delta() — called by Flask endpoint 

12 4. aggregate() — weighted FedAvg on metrics 

13 5. apply_aggregated() — store for dashboard + benchmark consumption 

14 6. track_convergence() — variance-based convergence score 

15""" 

16import hashlib 

17import hmac 

18import json 

19import logging 

20import math 

21import os 

22import threading 

23import time 

24from typing import Dict, List, Optional, Tuple 

25 

26logger = logging.getLogger('hevolve_social') 

27 

28DELTA_VERSION = 1 

29DELTA_MAX_AGE_SECONDS = 3600 # 1 hour freshness window 

30 

31# ── G8: Per-node HMAC secret (generated at first boot) ── 

32# Default to user-writable dir — installed builds at Program Files are read-only. 

33_HMAC_SECRET_PATH = os.path.join( 

34 os.environ.get('HEVOLVE_AGENT_DATA', 

35 os.path.join(os.path.expanduser('~'), '.nunba', 'agent_data')), 

36 '.hmac_secret') 

37 

38 

39def _load_or_create_hmac_secret() -> str: 

40 """Load per-node HMAC secret from disk, or generate on first boot. 

41 

42 The secret is 32 random bytes (hex-encoded, 64 chars) stored at 

43 agent_data/.hmac_secret. This replaces the old HART_NODE_KEY env var 

44 and Ed25519-public-key fallback with a proper per-node secret that is 

45 never transmitted in cleartext. 

46 """ 

47 try: 

48 if os.path.isfile(_HMAC_SECRET_PATH): 

49 with open(_HMAC_SECRET_PATH, 'r') as f: 

50 secret = f.read().strip() 

51 if len(secret) >= 32: 

52 return secret 

53 except (OSError, PermissionError) as e: 

54 logger.warning(f'Cannot read HMAC secret ({e}), regenerating') 

55 

56 # Generate new secret 

57 secret = os.urandom(32).hex() 

58 try: 

59 os.makedirs(os.path.dirname(_HMAC_SECRET_PATH), exist_ok=True) 

60 with open(_HMAC_SECRET_PATH, 'w') as f: 

61 f.write(secret) 

62 # Restrict permissions (owner read/write only) 

63 try: 

64 import stat 

65 os.chmod(_HMAC_SECRET_PATH, stat.S_IRUSR | stat.S_IWUSR) 

66 except (OSError, NotImplementedError): 

67 pass # Windows doesn't support POSIX chmod the same way 

68 logger.info(f'Generated per-node HMAC secret at {_HMAC_SECRET_PATH}') 

69 except (OSError, PermissionError) as e: 

70 logger.warning(f'Cannot persist HMAC secret ({e}), using ephemeral') 

71 

72 return secret 

73 

74 

75# Cache the secret at module load 

76_NODE_HMAC_SECRET: str = '' 

77 

78 

79def _get_hmac_secret() -> str: 

80 """Return the per-node HMAC secret (lazy-loaded, cached).""" 

81 global _NODE_HMAC_SECRET 

82 if not _NODE_HMAC_SECRET: 

83 _NODE_HMAC_SECRET = _load_or_create_hmac_secret() 

84 return _NODE_HMAC_SECRET 

85 

86 

87def _sign_delta(delta_dict): 

88 """Sign a federation delta with per-node HMAC-SHA256 secret. 

89 

90 Uses the persistent per-node secret from agent_data/.hmac_secret 

91 (G8 fix — replaces the old HART_NODE_KEY env var / Ed25519 public 

92 key fallback which was a hardcoded/default key vulnerability). 

93 """ 

94 node_key = _get_hmac_secret() 

95 if not node_key: 

96 logger.error('HMAC secret unavailable — delta UNSIGNED') 

97 return delta_dict 

98 # Work on a copy without any existing hmac_signature 

99 to_sign = {k: v for k, v in delta_dict.items() if k != 'hmac_signature'} 

100 payload = json.dumps(to_sign, sort_keys=True).encode() 

101 sig = hmac.new(node_key.encode(), payload, hashlib.sha256).hexdigest() 

102 delta_dict['hmac_signature'] = sig 

103 return delta_dict 

104 

105 

106def _verify_delta_signature(delta_dict): 

107 """Verify a received federation delta's HMAC-SHA256 signature. 

108 

109 G8: For verification we need the sender's HMAC secret, which is 

110 exchanged during federation handshake (signed by node's Ed25519 key). 

111 We check: 

112 1. Our own per-node secret (for self-originated deltas) 

113 2. Sender's Ed25519-signed HMAC public (from handshake cache) 

114 3. Legacy: sender's Ed25519 public key as fallback 

115 """ 

116 sig = delta_dict.get('hmac_signature', '') 

117 if not sig: 

118 return False 

119 to_verify = {k: v for k, v in delta_dict.items() if k != 'hmac_signature'} 

120 payload = json.dumps(to_verify, sort_keys=True).encode() 

121 

122 # 1. Try our own per-node HMAC secret (self-test / same-node) 

123 our_secret = _get_hmac_secret() 

124 if our_secret: 

125 expected = hmac.new(our_secret.encode(), payload, hashlib.sha256).hexdigest() 

126 if hmac.compare_digest(sig, expected): 

127 return True 

128 

129 # 2. Try peer's exchanged HMAC secret from handshake cache 

130 sender_node_id = delta_dict.get('node_id', '') 

131 if sender_node_id: 

132 peer_secret = _get_peer_hmac_secret(sender_node_id) 

133 if peer_secret: 

134 expected = hmac.new(peer_secret.encode(), payload, hashlib.sha256).hexdigest() 

135 if hmac.compare_digest(sig, expected): 

136 return True 

137 

138 # 3. Legacy fallback: sender used their Ed25519 public key as HMAC key 

139 sender_pubkey = delta_dict.get('public_key', '') 

140 if sender_pubkey: 

141 expected = hmac.new(sender_pubkey.encode(), payload, hashlib.sha256).hexdigest() 

142 if hmac.compare_digest(sig, expected): 

143 return True 

144 

145 return False 

146 

147 

148# ── Peer HMAC secret exchange cache ── 

149_peer_hmac_secrets: Dict[str, str] = {} 

150_peer_hmac_lock = threading.Lock() 

151 

152 

153def register_peer_hmac_secret(node_id: str, secret: str): 

154 """Store a peer's HMAC secret received during federation handshake.""" 

155 with _peer_hmac_lock: 

156 _peer_hmac_secrets[node_id] = secret 

157 

158 

159def _get_peer_hmac_secret(node_id: str) -> str: 

160 """Retrieve a peer's HMAC secret from the handshake cache.""" 

161 with _peer_hmac_lock: 

162 return _peer_hmac_secrets.get(node_id, '') 

163 

164 

165def get_hmac_secret_for_handshake() -> str: 

166 """Return our HMAC secret for federation handshake exchange. 

167 

168 The caller (federation handshake) should sign this with the node's 

169 Ed25519 key before transmitting to peers. 

170 """ 

171 return _get_hmac_secret() 

172 

173 

174class FederatedAggregator: 

175 """Periodic federated learning delta aggregation via gossip. 

176 

177 Singleton via get_federated_aggregator(). tick() is called by AgentDaemon. 

178 """ 

179 

180 # Alarm on N consecutive failing epochs. Below this, ticks log at 

181 # .exception but federation keeps trying — one bad epoch is normal 

182 # (node churn, transient network, peer restart). At/above this, 

183 # log.error + consecutive_failure counter flips so a dashboard or 

184 # operator sees the sustained outage. 

185 _CONSECUTIVE_FAILURE_ALARM: int = 3 

186 

187 def __init__(self): 

188 self._lock = threading.Lock() 

189 self._peer_deltas: Dict[str, dict] = {} # node_id → latest delta 

190 self._local_delta: Optional[dict] = None 

191 self._epoch = 0 

192 self._convergence_history: List[float] = [] 

193 self._last_aggregated: Optional[dict] = None 

194 # Consecutive-tick failure counter — reset on any success. Used 

195 # by the tick() silent-fail guard to distinguish transient from 

196 # sustained outages. 

197 self._consecutive_tick_failures: int = 0 

198 # Last exception seen per channel, exposed via get_status() so 

199 # dashboards can show "federation unhealthy: <reason>" instead 

200 # of an opaque stall. 

201 self._last_tick_error: Optional[str] = None 

202 # Exponential backoff for unreachable federation peers 

203 from core.circuit_breaker import PeerBackoff 

204 self._peer_backoff = PeerBackoff(initial=10, maximum=300) 

205 # Embedding delta channel (Phase 1 gradient sync) 

206 self._embedding_lock = threading.Lock() 

207 self._embedding_deltas: Dict[str, dict] = {} # node_id → compressed delta 

208 self._embedding_epoch = 0 

209 self._last_embedding_aggregated: Optional[dict] = None 

210 # Model lifecycle delta channel (dynamic load/unload intelligence) 

211 self._lifecycle_lock = threading.Lock() 

212 self._lifecycle_deltas: Dict[str, dict] = {} # node_id → model usage stats 

213 self._last_lifecycle_aggregated: Optional[dict] = None 

214 # Resonance tuning delta channel (personality tuning across nodes) 

215 self._resonance_lock = threading.Lock() 

216 self._resonance_deltas: Dict[str, dict] = {} # node_id → anonymized tuning stats 

217 self._resonance_epoch = 0 

218 self._last_resonance_aggregated: Optional[dict] = None 

219 # Recipe sharing channel (trained task intelligence) 

220 self._recipe_lock = threading.Lock() 

221 self._recipe_deltas: Dict[str, dict] = {} # node_id → recipe catalog 

222 self._last_recipe_aggregated: Optional[dict] = None 

223 # EventBus counters (fed by real-time events) 

224 self._event_counters_lock = threading.Lock() 

225 self._event_counters: Dict[str, int] = {} 

226 

227 # Subscribe to EventBus (if platform is bootstrapped) 

228 self._subscribe_to_eventbus() 

229 

230 def tick(self) -> dict: 

231 """Full cycle: extract → broadcast → aggregate → apply → track. 

232 

233 Failure handling: 

234 * One failing epoch is logged at .exception (full traceback) 

235 so operators can correlate with peer-side errors. 

236 * ``_consecutive_tick_failures`` tracks the run of failures; 

237 any successful outer try-body resets it. When the count 

238 hits ``_CONSECUTIVE_FAILURE_ALARM`` (3 by default) we flip 

239 to log.error level and stamp ``result['alarm']`` so any 

240 dashboard / metric scraper sees the sustained outage. 

241 * The tick method itself always returns — callers (AgentDaemon 

242 tick loop) must be able to skip one bad epoch and try again. 

243 """ 

244 self._peer_backoff.prune_expired() 

245 # Determine local node tier once per tick for structured logs. 

246 try: 

247 from security.key_delegation import get_node_tier 

248 node_tier = get_node_tier() 

249 except Exception: 

250 node_tier = 'unknown' 

251 

252 result = {'epoch': self._epoch, 'aggregated': False} 

253 try: 

254 self._local_delta = self.extract_local_delta() 

255 if self._local_delta: 

256 self.broadcast_delta(self._local_delta) 

257 

258 aggregated = self.aggregate() 

259 if aggregated: 

260 self.apply_aggregated(aggregated) 

261 convergence = self.track_convergence() 

262 self._epoch += 1 

263 result.update({ 

264 'aggregated': True, 

265 'epoch': self._epoch, 

266 'convergence': convergence, 

267 'peer_count': len(self._peer_deltas), 

268 }) 

269 

270 # Embedding channel tick (Phase 1 gradient sync) 

271 embedding_result = self.embedding_tick() 

272 if embedding_result.get('aggregated'): 

273 result['embedding'] = embedding_result 

274 

275 # Resonance channel tick (personality tuning across nodes) 

276 resonance_result = self.resonance_tick() 

277 if resonance_result.get('aggregated'): 

278 result['resonance'] = resonance_result 

279 

280 # Success — reset the consecutive-failure window. 

281 if self._consecutive_tick_failures: 

282 logger.info( 

283 f"[FederatedAggregator] tick recovered after " 

284 f"{self._consecutive_tick_failures} consecutive failures " 

285 f"(epoch={self._epoch}, tier={node_tier})") 

286 self._consecutive_tick_failures = 0 

287 self._last_tick_error = None 

288 except Exception as e: 

289 self._consecutive_tick_failures += 1 

290 self._last_tick_error = f"{type(e).__name__}: {e}" 

291 consecutive = self._consecutive_tick_failures 

292 result['error'] = str(e) 

293 result['consecutive_failures'] = consecutive 

294 

295 # First N-1 failures: .exception (with traceback) at warning 

296 # level; >= N: .error + alarm flag. Either way, federation 

297 # keeps trying on the next tick. 

298 if consecutive >= self._CONSECUTIVE_FAILURE_ALARM: 

299 result['alarm'] = True 

300 logger.error( 

301 f"[FederatedAggregator] ALARM: {consecutive} consecutive " 

302 f"federation ticks failed (epoch={self._epoch}, " 

303 f"tier={node_tier}): {e}", 

304 exc_info=True) 

305 else: 

306 logger.exception( 

307 f"[FederatedAggregator] tick failed " 

308 f"(consecutive={consecutive}, epoch={self._epoch}, " 

309 f"tier={node_tier}): {e}") 

310 return result 

311 

312 def extract_local_delta(self) -> Optional[dict]: 

313 """Pull learning metrics from WorldModelBridge + HiveMind.""" 

314 try: 

315 from .world_model_bridge import get_world_model_bridge 

316 bridge = get_world_model_bridge() 

317 stats = bridge.get_stats() 

318 learning_stats = bridge.get_learning_stats() 

319 

320 # Get node identity for signing 

321 node_id = '' 

322 public_key = '' 

323 try: 

324 from security.node_integrity import get_node_identity 

325 identity = get_node_identity() 

326 node_id = identity.get('node_id', '') 

327 public_key = identity.get('public_key', '') 

328 except Exception: 

329 pass 

330 

331 # Get guardrail hash 

332 guardrail_hash = '' 

333 try: 

334 from security.hive_guardrails import compute_guardrail_hash 

335 guardrail_hash = compute_guardrail_hash() 

336 except Exception: 

337 pass 

338 

339 # Get capability tier 

340 capability_tier = 'standard' 

341 try: 

342 from security.system_requirements import get_tier_name 

343 capability_tier = get_tier_name() 

344 except Exception: 

345 pass 

346 

347 # Get contribution score 

348 contribution_score = 0.0 

349 try: 

350 from integrations.social.models import get_db, PeerNode 

351 db = get_db() 

352 try: 

353 node = db.query(PeerNode).filter_by( 

354 node_id=node_id).first() 

355 if node: 

356 contribution_score = getattr( 

357 node, 'contribution_score', 0.0) or 0.0 

358 finally: 

359 db.close() 

360 except Exception: 

361 pass 

362 

363 # Build delta 

364 hivemind_stats = learning_stats.get('hivemind', {}) 

365 bridge_stats = learning_stats.get('bridge', {}) 

366 

367 delta = { 

368 'version': DELTA_VERSION, 

369 'node_id': node_id, 

370 'public_key': public_key, 

371 'guardrail_hash': guardrail_hash, 

372 'timestamp': time.time(), 

373 'experience_stats': { 

374 'total_recorded': bridge_stats.get('total_recorded', 0), 

375 'total_flushed': bridge_stats.get('total_flushed', 0), 

376 'flush_rate': ( 

377 bridge_stats.get('total_flushed', 0) / 

378 max(1, bridge_stats.get('total_recorded', 1)) 

379 ), 

380 }, 

381 'ralt_stats': { 

382 'skills_distributed': bridge_stats.get( 

383 'total_skills_distributed', 0), 

384 'skills_blocked': bridge_stats.get( 

385 'total_skills_blocked', 0), 

386 'acceptance_rate': ( 

387 bridge_stats.get('total_skills_distributed', 0) / 

388 max(1, bridge_stats.get('total_skills_distributed', 0) + 

389 bridge_stats.get('total_skills_blocked', 0)) 

390 ), 

391 }, 

392 'hivemind_state': { 

393 'agent_count': hivemind_stats.get('agent_count', 0), 

394 'total_queries': bridge_stats.get( 

395 'total_hivemind_queries', 0), 

396 'avg_fusion_latency_ms': hivemind_stats.get( 

397 'avg_fusion_latency_ms', 0), 

398 }, 

399 'quality_metrics': { 

400 'correction_density': bridge_stats.get( 

401 'total_corrections', 0), 

402 'success_rate': 0.0, 

403 'goal_throughput': 0, 

404 }, 

405 'benchmark_results': self._get_benchmark_results(), 

406 'capability_tier': capability_tier, 

407 'contribution_score': contribution_score, 

408 'event_counters': self.get_event_counters(), 

409 } 

410 

411 # Sign the delta 

412 try: 

413 from security.node_integrity import sign_json_payload 

414 delta['signature'] = sign_json_payload(delta) 

415 except Exception: 

416 delta['signature'] = '' 

417 

418 return delta 

419 except Exception as e: 

420 logger.debug(f"Federation extract error: {e}") 

421 return None 

422 

423 def _get_benchmark_results(self) -> dict: 

424 """Pull latest benchmark results if BenchmarkRegistry exists.""" 

425 results = {} 

426 try: 

427 from .benchmark_registry import get_benchmark_registry 

428 registry = get_benchmark_registry() 

429 results = registry.get_latest_results() 

430 except Exception: 

431 pass 

432 

433 # Include coding agent benchmarks for hive tool routing intelligence 

434 try: 

435 from integrations.coding_agent.benchmark_tracker import get_benchmark_tracker 

436 coding_delta = get_benchmark_tracker().export_learning_delta() 

437 if coding_delta: 

438 results['coding_benchmarks'] = coding_delta.get('coding_benchmarks', {}) 

439 except Exception: 

440 pass 

441 

442 return results 

443 

444 def broadcast_delta(self, delta: dict): 

445 """POST delta to all known active peers. 

446 

447 Privacy enforcement: ScopeGuard.check_egress() runs before any data 

448 leaves this node. Only FEDERATED-scoped aggregate stats are sent. 

449 Raw user data, PII, and secrets are structurally blocked. 

450 """ 

451 # ── Edge privacy gate: block PII / secrets from leaving ── 

452 try: 

453 from security.edge_privacy import get_scope_guard, PrivacyScope 

454 guard = get_scope_guard() 

455 tagged_delta = dict(delta, _privacy_scope=PrivacyScope.FEDERATED) 

456 allowed, reason = guard.check_egress( 

457 tagged_delta, PrivacyScope.FEDERATED, 

458 context={'source': 'federation_broadcast'} 

459 ) 

460 if not allowed: 

461 logger.warning(f"Federation broadcast blocked by ScopeGuard: {reason}") 

462 return 

463 except ImportError: 

464 pass # edge_privacy not available — proceed (defense in depth below) 

465 

466 # Sign the delta with HMAC-SHA256 before broadcasting 

467 _sign_delta(delta) 

468 

469 # Attach origin attestation so peers can verify we're genuine HART OS 

470 try: 

471 from security.origin_attestation import get_attestation_for_federation 

472 att = get_attestation_for_federation() 

473 if att.get('valid'): 

474 delta['origin_attestation'] = att['attestation'] 

475 except Exception: 

476 pass 

477 

478 try: 

479 from integrations.social.models import get_db, PeerNode 

480 from core.http_pool import pooled_post 

481 

482 db = get_db() 

483 try: 

484 # Get our own backend port to detect self-connections 

485 try: 

486 from core.port_registry import get_port 

487 _own_port = get_port('backend') 

488 except Exception: 

489 _own_port = 6777 

490 _self_urls = { 

491 f'http://localhost:{_own_port}', 

492 f'http://127.0.0.1:{_own_port}', 

493 f'http://0.0.0.0:{_own_port}', 

494 } 

495 

496 peers = db.query(PeerNode).filter_by(status='active').all() 

497 now = time.time() 

498 for peer in peers: 

499 if not peer.url or peer.node_id == delta.get('node_id'): 

500 continue 

501 _peer_url = peer.url.rstrip('/') 

502 # Skip our own node (bundled mode has no HTTP listener) 

503 if _peer_url in _self_urls: 

504 continue 

505 if self._peer_backoff.is_backed_off(_peer_url): 

506 continue 

507 try: 

508 url = f"{_peer_url}/api/social/peers/federation-delta" 

509 pooled_post(url, json=delta, timeout=5) 

510 self._peer_backoff.record_success(_peer_url) 

511 except Exception: 

512 self._peer_backoff.record_failure(_peer_url) 

513 finally: 

514 db.close() 

515 except Exception as e: 

516 logger.debug(f"Federation broadcast error: {e}") 

517 

518 def receive_peer_delta(self, delta: dict) -> Tuple[bool, str]: 

519 """Validate and store incoming peer delta. 

520 

521 Validates: schema version, freshness, Ed25519 signature, guardrail hash. 

522 """ 

523 if not isinstance(delta, dict): 

524 return False, 'invalid payload' 

525 

526 if delta.get('version') != DELTA_VERSION: 

527 return False, f'version mismatch (expected {DELTA_VERSION})' 

528 

529 # Freshness check 

530 ts = delta.get('timestamp', 0) 

531 if abs(time.time() - ts) > DELTA_MAX_AGE_SECONDS: 

532 return False, 'delta too old or from the future' 

533 

534 # Guardrail hash verification 

535 try: 

536 from security.hive_guardrails import compute_guardrail_hash 

537 local_hash = compute_guardrail_hash() 

538 if delta.get('guardrail_hash') and delta['guardrail_hash'] != local_hash: 

539 return False, 'guardrail hash mismatch' 

540 except ImportError: 

541 pass 

542 

543 # Ed25519 signature verification — required in hard mode 

544 from security.master_key import get_enforcement_mode 

545 _enforcement = get_enforcement_mode() 

546 sig = delta.get('signature', '') 

547 if sig: 

548 try: 

549 from security.node_integrity import verify_json_signature 

550 if not verify_json_signature(delta.get('public_key', ''), 

551 delta, sig): 

552 return False, 'invalid signature' 

553 except ImportError: 

554 logger.warning('Ed25519 verification module unavailable') 

555 if _enforcement == 'hard': 

556 return False, 'Ed25519 module unavailable — cannot verify' 

557 except Exception as e: 

558 logger.warning(f'Ed25519 signature verification error: {e}') 

559 if _enforcement == 'hard': 

560 return False, f'signature verification failed: {e}' 

561 elif _enforcement == 'hard': 

562 return False, 'missing Ed25519 signature (hard enforcement)' 

563 

564 # HMAC-SHA256 delta signing verification — required in hard mode 

565 if delta.get('hmac_signature'): 

566 if not _verify_delta_signature(delta): 

567 return False, 'invalid HMAC signature' 

568 elif _enforcement == 'hard': 

569 return False, 'missing HMAC signature (hard enforcement)' 

570 

571 # Origin attestation — reject forks and rebranded builds 

572 peer_attestation = delta.get('origin_attestation') 

573 if peer_attestation: 

574 try: 

575 from security.origin_attestation import verify_peer_attestation 

576 att_ok, att_msg = verify_peer_attestation(peer_attestation) 

577 if not att_ok: 

578 return False, f'origin attestation failed: {att_msg}' 

579 except ImportError: 

580 pass # Origin module not available — accept 

581 

582 # Revocation check — master-key-signed network halt via federation 

583 revocation = delta.get('revocation') 

584 if revocation and isinstance(revocation, dict): 

585 rev_sig = revocation.get('master_signature', '') 

586 if rev_sig: 

587 try: 

588 from security.master_key import verify_master_signature 

589 rev_payload = {k: v for k, v in revocation.items() 

590 if k != 'master_signature'} 

591 if verify_master_signature(rev_payload, rev_sig): 

592 logger.critical( 

593 'REVOCATION received via federation delta — ' 

594 'tripping circuit breaker: %s', 

595 revocation.get('reason', 'no reason')) 

596 try: 

597 from security.hive_guardrails import HiveCircuitBreaker 

598 HiveCircuitBreaker.trip( 

599 reason=revocation.get('reason', 'revocation')) 

600 except Exception as e: 

601 logger.critical(f'Circuit breaker trip failed: {e}') 

602 return True, 'revocation accepted' 

603 else: 

604 logger.warning('Revocation in delta has INVALID ' 

605 'master signature — ignoring') 

606 except ImportError: 

607 logger.warning('Cannot verify revocation — ' 

608 'security modules missing') 

609 

610 node_id = delta.get('node_id', '') 

611 if not node_id: 

612 return False, 'missing node_id' 

613 

614 with self._lock: 

615 self._peer_deltas[node_id] = delta 

616 

617 return True, 'accepted' 

618 

619 def aggregate(self) -> Optional[dict]: 

620 """Weighted FedAvg across all peer deltas + local delta.""" 

621 with self._lock: 

622 all_deltas = list(self._peer_deltas.values()) 

623 if self._local_delta: 

624 all_deltas.append(self._local_delta) 

625 

626 if len(all_deltas) < 1: 

627 return None 

628 

629 # Equal voice: every node's intelligence counts the same. 

630 # Weight by data quality (interactions observed) not hardware tier. 

631 # A Raspberry Pi that served 10,000 users has more insight than 

632 # a GPU server that served 10. No one entity owns the built 

633 # intelligence — everyone is equal for this hive being. 

634 weights = [] 

635 for d in all_deltas: 

636 interactions = ( 

637 d.get('experience_stats', {}).get('total_recorded', 0) + 

638 d.get('quality_metrics', {}).get('goal_throughput', 0) 

639 ) 

640 # Weight by log of interactions — diminishing returns prevents 

641 # any single high-traffic node from dominating 

642 w = math.log1p(max(0, interactions)) 

643 weights.append(max(1.0, w)) # Floor at 1.0 — every node counts 

644 

645 total_weight = sum(weights) 

646 

647 # Weighted average of numeric metrics 

648 aggregated = { 

649 'epoch': self._epoch + 1, 

650 'peer_count': len(all_deltas), 

651 'timestamp': time.time(), 

652 'experience_stats': self._weighted_avg_dict( 

653 [d.get('experience_stats', {}) for d in all_deltas], weights, total_weight), 

654 'ralt_stats': self._weighted_avg_dict( 

655 [d.get('ralt_stats', {}) for d in all_deltas], weights, total_weight), 

656 'hivemind_state': self._weighted_avg_dict( 

657 [d.get('hivemind_state', {}) for d in all_deltas], weights, total_weight), 

658 'quality_metrics': self._weighted_avg_dict( 

659 [d.get('quality_metrics', {}) for d in all_deltas], weights, total_weight), 

660 } 

661 return aggregated 

662 

663 def _weighted_avg_dict(self, dicts: list, weights: list, 

664 total_weight: float) -> dict: 

665 """Compute weighted average of numeric values in list of dicts.""" 

666 result = {} 

667 if not dicts: 

668 return result 

669 keys = set() 

670 for d in dicts: 

671 keys.update(d.keys()) 

672 for key in keys: 

673 vals = [] 

674 ws = [] 

675 for d, w in zip(dicts, weights): 

676 v = d.get(key) 

677 if isinstance(v, (int, float)): 

678 vals.append(v) 

679 ws.append(w) 

680 if vals: 

681 result[key] = sum(v * w for v, w in zip(vals, ws)) / max(1e-10, sum(ws)) 

682 return result 

683 

684 def apply_aggregated(self, aggregated: dict): 

685 """Store aggregated metrics locally for dashboard + benchmark consumption. 

686 

687 Single code path: routes through WorldModelBridge.apply_federation_update() 

688 which owns storage AND the EventBus emit. This prevents the earlier 

689 dead path where apply_aggregated mutated bridge._federation_aggregated 

690 directly and emitted 'federation.aggregated' on its own, while the 

691 bridge's apply_federation_update() was never called by any caller. 

692 """ 

693 self._last_aggregated = aggregated 

694 try: 

695 from .world_model_bridge import get_world_model_bridge 

696 bridge = get_world_model_bridge() 

697 bridge.apply_federation_update(aggregated) 

698 except Exception as exc: 

699 logger.debug(f"[FederatedAggregator] bridge.apply_federation_update failed: {exc}") 

700 

701 # Feed hive-aggregated coding benchmarks back to local tool router 

702 coding_data = aggregated.get('benchmark_results', {}).get('coding_benchmarks') 

703 if coding_data: 

704 try: 

705 from integrations.coding_agent.benchmark_tracker import get_benchmark_tracker 

706 get_benchmark_tracker().import_hive_delta({'coding_benchmarks': coding_data}) 

707 except Exception: 

708 pass 

709 

710 def track_convergence(self) -> float: 

711 """Variance-based convergence score across peer deltas. 

712 

713 Lower variance = higher convergence. Returns 0.0-1.0. 

714 """ 

715 with self._lock: 

716 deltas = list(self._peer_deltas.values()) 

717 

718 if len(deltas) < 2: 

719 score = 1.0 

720 else: 

721 # Use flush_rate variance as proxy 

722 rates = [ 

723 d.get('experience_stats', {}).get('flush_rate', 0) 

724 for d in deltas 

725 ] 

726 mean_rate = sum(rates) / len(rates) 

727 variance = sum((r - mean_rate) ** 2 for r in rates) / len(rates) 

728 score = 1.0 / (1.0 + variance * 100) 

729 

730 self._convergence_history.append(score) 

731 if len(self._convergence_history) > 100: 

732 self._convergence_history = self._convergence_history[-100:] 

733 return score 

734 

735 # ─── Embedding Delta Channel (Phase 1 Gradient Sync) ─── 

736 

737 def receive_embedding_delta(self, node_id: str, delta: dict): 

738 """Store a compressed embedding delta from a peer node.""" 

739 if not node_id or not isinstance(delta, dict): 

740 return 

741 with self._embedding_lock: 

742 self._embedding_deltas[node_id] = delta 

743 

744 def aggregate_embeddings(self) -> Optional[dict]: 

745 """Aggregate all embedding deltas using trimmed mean.""" 

746 with self._embedding_lock: 

747 deltas = list(self._embedding_deltas.values()) 

748 if not deltas: 

749 return None 

750 

751 try: 

752 from .embedding_delta import trimmed_mean_aggregate 

753 weights = [] 

754 for d in deltas: 

755 cs = d.get('contribution_score', 1.0) 

756 weights.append(max(0.01, cs if isinstance(cs, (int, float)) else 1.0)) 

757 

758 aggregated = trimmed_mean_aggregate(deltas, weights=weights) 

759 self._last_embedding_aggregated = aggregated 

760 self._embedding_epoch += 1 

761 return aggregated 

762 except Exception as e: 

763 logger.debug(f"Embedding aggregation error: {e}") 

764 return None 

765 

766 def embedding_tick(self) -> dict: 

767 """Embedding channel tick: aggregate + clear stale deltas.""" 

768 result = {'embedding_epoch': self._embedding_epoch, 'aggregated': False} 

769 try: 

770 aggregated = self.aggregate_embeddings() 

771 if aggregated: 

772 result.update({ 

773 'aggregated': True, 

774 'embedding_epoch': self._embedding_epoch, 

775 'peer_count': aggregated.get('peer_count', 0), 

776 'outliers_removed': aggregated.get('outliers_removed', 0), 

777 }) 

778 # Clear processed deltas 

779 with self._embedding_lock: 

780 self._embedding_deltas.clear() 

781 except Exception as e: 

782 result['error'] = str(e) 

783 return result 

784 

785 def get_embedding_stats(self) -> dict: 

786 """Return embedding sync stats for dashboard.""" 

787 with self._embedding_lock: 

788 pending = len(self._embedding_deltas) 

789 return { 

790 'embedding_epoch': self._embedding_epoch, 

791 'pending_deltas': pending, 

792 'last_aggregated': self._last_embedding_aggregated, 

793 } 

794 

795 # ─── Model Lifecycle Delta Channel ─── 

796 

797 def receive_lifecycle_delta(self, node_id: str, delta: dict): 

798 """Store model usage stats from a peer node.""" 

799 if not node_id or not isinstance(delta, dict): 

800 return 

801 with self._lifecycle_lock: 

802 self._lifecycle_deltas[node_id] = delta 

803 

804 def aggregate_lifecycle(self) -> Optional[dict]: 

805 """Aggregate model popularity across all peers. 

806 

807 Returns: {popularity: {model_name: 0.0-1.0}, peer_count: int} 

808 """ 

809 with self._lifecycle_lock: 

810 deltas = list(self._lifecycle_deltas.values()) 

811 if not deltas: 

812 return self._last_lifecycle_aggregated 

813 

814 total_peers = len(deltas) 

815 model_counts: Dict[str, int] = {} 

816 model_access_rates: Dict[str, List[float]] = {} 

817 

818 for d in deltas: 

819 for model_name, stats in d.get('models', {}).items(): 

820 model_counts[model_name] = model_counts.get(model_name, 0) + 1 

821 rate = stats.get('access_rate', 0) 

822 if isinstance(rate, (int, float)): 

823 model_access_rates.setdefault(model_name, []).append(rate) 

824 

825 popularity = {} 

826 for name, count in model_counts.items(): 

827 peer_fraction = count / max(1, total_peers) 

828 rates = model_access_rates.get(name, [0]) 

829 avg_rate = sum(rates) / max(1, len(rates)) 

830 popularity[name] = min(1.0, peer_fraction * (1 + avg_rate)) 

831 

832 result = {'popularity': popularity, 'peer_count': total_peers} 

833 self._last_lifecycle_aggregated = result 

834 return result 

835 

836 def get_lifecycle_stats(self) -> dict: 

837 """Return model lifecycle delta stats for dashboard.""" 

838 with self._lifecycle_lock: 

839 pending = len(self._lifecycle_deltas) 

840 return { 

841 'pending_deltas': pending, 

842 'last_aggregated': self._last_lifecycle_aggregated, 

843 } 

844 

845 # ─── Resonance Tuning Delta Channel ─── 

846 

847 def receive_resonance_delta(self, node_id: str, delta: dict): 

848 """Store anonymized resonance tuning stats from a peer node.""" 

849 if not node_id or not isinstance(delta, dict): 

850 return 

851 with self._resonance_lock: 

852 self._resonance_deltas[node_id] = delta 

853 

854 def aggregate_resonance(self) -> Optional[dict]: 

855 """Aggregate resonance deltas: weighted avg of tuning distributions.""" 

856 with self._resonance_lock: 

857 deltas = list(self._resonance_deltas.values()) 

858 if not deltas: 

859 return None 

860 

861 # Weighted by user_count (more users = more representative) 

862 weights = [] 

863 for d in deltas: 

864 uc = d.get('user_count', 1) 

865 weights.append(max(1.0, float(uc))) 

866 total_w = sum(weights) 

867 

868 n_dims = len(deltas[0].get('avg_tuning', [])) 

869 if n_dims == 0: 

870 return None 

871 

872 avg_tuning = [0.0] * n_dims 

873 for d, w in zip(deltas, weights): 

874 at = d.get('avg_tuning', [0.5] * n_dims) 

875 for i in range(min(n_dims, len(at))): 

876 avg_tuning[i] += at[i] * w / total_w 

877 

878 result = { 

879 'avg_tuning': avg_tuning, 

880 'peer_count': len(deltas), 

881 'total_users': sum(d.get('user_count', 0) for d in deltas), 

882 'total_interactions': sum(d.get('total_interactions', 0) for d in deltas), 

883 'timestamp': time.time(), 

884 } 

885 self._last_resonance_aggregated = result 

886 self._resonance_epoch += 1 

887 return result 

888 

889 def resonance_tick(self) -> dict: 

890 """Resonance channel tick: extract local → aggregate → apply → clear.""" 

891 result = {'resonance_epoch': self._resonance_epoch, 'aggregated': False} 

892 try: 

893 # Extract local resonance delta 

894 try: 

895 from core.resonance_tuner import get_resonance_tuner 

896 tuner = get_resonance_tuner() 

897 local_delta = tuner.export_resonance_delta() 

898 if local_delta: 

899 # Broadcast to peers (piggyback on existing gossip) 

900 self._broadcast_resonance(local_delta) 

901 except ImportError: 

902 pass 

903 

904 aggregated = self.aggregate_resonance() 

905 if aggregated: 

906 # Apply hive-aggregated tuning to local profiles 

907 try: 

908 from core.resonance_tuner import get_resonance_tuner 

909 get_resonance_tuner().import_hive_resonance(aggregated) 

910 except ImportError: 

911 pass 

912 

913 result.update({ 

914 'aggregated': True, 

915 'resonance_epoch': self._resonance_epoch, 

916 'peer_count': aggregated.get('peer_count', 0), 

917 'total_users': aggregated.get('total_users', 0), 

918 }) 

919 with self._resonance_lock: 

920 self._resonance_deltas.clear() 

921 except Exception as e: 

922 result['error'] = str(e) 

923 return result 

924 

925 def _broadcast_resonance(self, delta: dict): 

926 """Broadcast resonance delta to peers via gossip.""" 

927 try: 

928 from integrations.social.peer_discovery import gossip 

929 gossip.broadcast({ 

930 'type': 'resonance_delta', 

931 'delta': delta, 

932 'timestamp': time.time(), 

933 }) 

934 except Exception: 

935 pass 

936 

937 def get_resonance_stats(self) -> dict: 

938 """Return resonance channel stats for dashboard.""" 

939 with self._resonance_lock: 

940 pending = len(self._resonance_deltas) 

941 return { 

942 'resonance_epoch': self._resonance_epoch, 

943 'pending_deltas': pending, 

944 'last_aggregated': self._last_resonance_aggregated, 

945 } 

946 

947 # ─── EventBus Integration ─── 

948 

949 def _subscribe_to_eventbus(self): 

950 """Subscribe to EventBus events so learning signals flow into federation. 

951 

952 Events consumed: inference.completed, resonance.tuned, memory.item_added, 

953 action_state.changed. Counters are included in the next extract_local_delta(). 

954 """ 

955 try: 

956 from core.platform.events import emit_event 

957 from core.platform.registry import get_registry 

958 registry = get_registry() 

959 if not registry.has('events'): 

960 return 

961 bus = registry.get('events') 

962 bus.on('inference.completed', self._on_event) 

963 bus.on('resonance.tuned', self._on_event) 

964 bus.on('memory.item_added', self._on_event) 

965 bus.on('action_state.changed', self._on_event) 

966 except Exception: 

967 pass # Platform not bootstrapped yet — will be wired on next tick 

968 

969 def _on_event(self, topic: str, data): 

970 """Accumulate event counts for federation delta.""" 

971 with self._event_counters_lock: 

972 self._event_counters[topic] = self._event_counters.get(topic, 0) + 1 

973 

974 def get_event_counters(self) -> dict: 

975 """Return and reset event counters for inclusion in federation delta.""" 

976 with self._event_counters_lock: 

977 counters = dict(self._event_counters) 

978 self._event_counters.clear() 

979 return counters 

980 

981 # ─── Recipe Sharing Channel ─── 

982 

983 def receive_recipe_delta(self, node_id: str, delta: dict): 

984 """Store recipe catalog summary from a peer node. 

985 

986 Delta format: {recipes: [{id, name, action_count, success_rate, reuse_count}]} 

987 No proprietary data — just catalog metadata for discovery. 

988 """ 

989 if not node_id or not isinstance(delta, dict): 

990 return 

991 

992 # Check consent for recipe sharing (best-effort, fail-open) 

993 user_id = delta.get('user_id', '') 

994 if user_id: 

995 try: 

996 from integrations.social.consent_service import ConsentService 

997 from integrations.social.models import db_session 

998 with db_session() as db: 

999 if not ConsentService.check_consent(db, user_id, 'public_exposure'): 

1000 logger.debug(f"Recipe delta from {node_id} blocked: user {user_id} has not consented") 

1001 return 

1002 except (ImportError, ValueError, Exception): 

1003 pass # consent service unavailable — allow (fail-open for dev) 

1004 

1005 with self._recipe_lock: 

1006 self._recipe_deltas[node_id] = delta 

1007 

1008 def aggregate_recipes(self) -> Optional[dict]: 

1009 """Aggregate recipe catalogs — build hive recipe index. 

1010 

1011 Every node's recipes are equally discoverable. No node gets priority 

1012 in the index regardless of its hardware tier. 

1013 """ 

1014 with self._recipe_lock: 

1015 deltas = list(self._recipe_deltas.values()) 

1016 if not deltas: 

1017 return self._last_recipe_aggregated 

1018 

1019 # Build unified catalog — every recipe listed equally 

1020 hive_recipes = {} 

1021 for d in deltas: 

1022 node_id = d.get('node_id', 'unknown') 

1023 for recipe in d.get('recipes', []): 

1024 rid = recipe.get('id', '') 

1025 if rid: 

1026 if rid not in hive_recipes: 

1027 hive_recipes[rid] = { 

1028 'id': rid, 

1029 'name': recipe.get('name', ''), 

1030 'action_count': recipe.get('action_count', 0), 

1031 'nodes': [], 

1032 'total_reuse_count': 0, 

1033 'avg_success_rate': 0.0, 

1034 } 

1035 entry = hive_recipes[rid] 

1036 entry['nodes'].append(node_id) 

1037 entry['total_reuse_count'] += recipe.get('reuse_count', 0) 

1038 # Running average of success rates 

1039 n = len(entry['nodes']) 

1040 old_avg = entry['avg_success_rate'] 

1041 new_rate = recipe.get('success_rate', 0.0) 

1042 entry['avg_success_rate'] = old_avg + (new_rate - old_avg) / n 

1043 

1044 result = { 

1045 'recipes': list(hive_recipes.values()), 

1046 'total_recipes': len(hive_recipes), 

1047 'peer_count': len(deltas), 

1048 'timestamp': time.time(), 

1049 } 

1050 self._last_recipe_aggregated = result 

1051 return result 

1052 

1053 def get_recipe_stats(self) -> dict: 

1054 """Return recipe sharing stats for dashboard.""" 

1055 with self._recipe_lock: 

1056 pending = len(self._recipe_deltas) 

1057 return { 

1058 'pending_deltas': pending, 

1059 'last_aggregated': self._last_recipe_aggregated, 

1060 } 

1061 

1062 def get_stats(self) -> dict: 

1063 """Return federation stats for dashboard.""" 

1064 with self._lock: 

1065 peer_count = len(self._peer_deltas) 

1066 stats = { 

1067 'epoch': self._epoch, 

1068 'peer_count': peer_count, 

1069 'convergence': self._convergence_history[-1] if self._convergence_history else 0.0, 

1070 'convergence_history': self._convergence_history[-10:], 

1071 'last_aggregated': self._last_aggregated, 

1072 } 

1073 # Include embedding stats 

1074 try: 

1075 stats['embedding'] = self.get_embedding_stats() 

1076 except Exception: 

1077 pass 

1078 # Include model lifecycle stats 

1079 try: 

1080 stats['lifecycle'] = self.get_lifecycle_stats() 

1081 except Exception: 

1082 pass 

1083 # Include resonance stats 

1084 try: 

1085 stats['resonance'] = self.get_resonance_stats() 

1086 except Exception: 

1087 pass 

1088 # Include recipe sharing stats 

1089 try: 

1090 stats['recipes'] = self.get_recipe_stats() 

1091 except Exception: 

1092 pass 

1093 return stats 

1094 

1095 # ── Node bootstrapping — help new nodes become better ── 

1096 

1097 def bootstrap_new_node(self, node_id: str) -> dict: 

1098 """Share aggregated learning with a newly joined node. 

1099 

1100 The flywheel helps every node improve — not just extract compute. 

1101 Pre-trusted nodes share: 

1102 - Aggregated benchmarks (what tools work best for what tasks) 

1103 - Recipe index (trained task patterns for REUSE mode) 

1104 - Quality metrics (community-validated heuristics) 

1105 - Resonance baseline (federated personality norms) 

1106 

1107 What is NOT shared: 

1108 - Raw user data (EDGE_ONLY — never leaves device) 

1109 - PII or secrets (DLP + ScopeGuard blocks) 

1110 - Raw weights (only non-interpretable LoRA deltas in Phase 2) 

1111 - Individual conversation history 

1112 

1113 Returns a bootstrap package for the new node. 

1114 """ 

1115 package = { 

1116 'type': 'node_bootstrap', 

1117 'from_node': '', 

1118 'for_node': node_id, 

1119 'timestamp': time.time(), 

1120 } 

1121 

1122 # Aggregated benchmarks — what the hive has learned about tool performance 

1123 package['benchmarks'] = self._get_benchmark_results() 

1124 

1125 # Recipe index — trained task patterns (metadata only, not full recipes) 

1126 try: 

1127 package['recipe_index'] = self.get_recipe_stats() 

1128 except Exception: 

1129 package['recipe_index'] = {} 

1130 

1131 # Quality heuristics — community-validated metrics 

1132 try: 

1133 if self.peer_deltas: 

1134 quality = {} 

1135 for d in self.peer_deltas.values(): 

1136 qm = d.get('quality_metrics', {}) 

1137 for k, v in qm.items(): 

1138 if isinstance(v, (int, float)): 

1139 quality.setdefault(k, []).append(v) 

1140 package['quality_baselines'] = { 

1141 k: sum(v) / len(v) for k, v in quality.items() if v 

1142 } 

1143 else: 

1144 package['quality_baselines'] = {} 

1145 except Exception: 

1146 package['quality_baselines'] = {} 

1147 

1148 # Resonance norms — federated personality baselines (aggregate only) 

1149 try: 

1150 package['resonance_norms'] = self.get_resonance_stats() 

1151 except Exception: 

1152 package['resonance_norms'] = {} 

1153 

1154 # ScopeGuard: verify nothing private leaks in bootstrap 

1155 try: 

1156 from security.edge_privacy import get_scope_guard, PrivacyScope 

1157 guard = get_scope_guard() 

1158 tagged = dict(package, _privacy_scope=PrivacyScope.FEDERATED) 

1159 allowed, reason = guard.check_egress( 

1160 tagged, PrivacyScope.FEDERATED, 

1161 context={'source': 'node_bootstrap', 'target_node': node_id} 

1162 ) 

1163 if not allowed: 

1164 logger.warning(f"Bootstrap blocked by ScopeGuard: {reason}") 

1165 return {'error': reason} 

1166 except ImportError: 

1167 pass 

1168 

1169 logger.info(f"Bootstrap package for node {node_id}: " 

1170 f"{len(package.get('benchmarks', {}))} benchmarks, " 

1171 f"{package.get('recipe_index', {}).get('total_recipes', 0)} recipes") 

1172 return package 

1173 

1174 

1175# ─── Singleton ─── 

1176_aggregator = None 

1177_aggregator_lock = threading.Lock() 

1178 

1179 

1180def get_federated_aggregator() -> FederatedAggregator: 

1181 global _aggregator 

1182 if _aggregator is None: 

1183 with _aggregator_lock: 

1184 if _aggregator is None: 

1185 _aggregator = FederatedAggregator() 

1186 return _aggregator