Coverage for integrations / agent_engine / federated_aggregator.py: 83.0%
569 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Unified Agent Goal Engine - Federated Learning Delta Aggregation
4Periodic aggregation of learning metrics across HART nodes via gossip.
5Complementary to HiveMind's inference-time tensor fusion — this handles
6training-time metric synchronization.
8Lifecycle (driven by AgentDaemon._tick every 2nd tick):
9 1. extract_local_delta() — pull metrics from WorldModelBridge
10 2. broadcast_delta() — sign + POST to peers
11 3. receive_peer_delta() — called by Flask endpoint
12 4. aggregate() — weighted FedAvg on metrics
13 5. apply_aggregated() — store for dashboard + benchmark consumption
14 6. track_convergence() — variance-based convergence score
15"""
16import hashlib
17import hmac
18import json
19import logging
20import math
21import os
22import threading
23import time
24from typing import Dict, List, Optional, Tuple
26logger = logging.getLogger('hevolve_social')
28DELTA_VERSION = 1
29DELTA_MAX_AGE_SECONDS = 3600 # 1 hour freshness window
31# ── G8: Per-node HMAC secret (generated at first boot) ──
32# Default to user-writable dir — installed builds at Program Files are read-only.
33_HMAC_SECRET_PATH = os.path.join(
34 os.environ.get('HEVOLVE_AGENT_DATA',
35 os.path.join(os.path.expanduser('~'), '.nunba', 'agent_data')),
36 '.hmac_secret')
39def _load_or_create_hmac_secret() -> str:
40 """Load per-node HMAC secret from disk, or generate on first boot.
42 The secret is 32 random bytes (hex-encoded, 64 chars) stored at
43 agent_data/.hmac_secret. This replaces the old HART_NODE_KEY env var
44 and Ed25519-public-key fallback with a proper per-node secret that is
45 never transmitted in cleartext.
46 """
47 try:
48 if os.path.isfile(_HMAC_SECRET_PATH):
49 with open(_HMAC_SECRET_PATH, 'r') as f:
50 secret = f.read().strip()
51 if len(secret) >= 32:
52 return secret
53 except (OSError, PermissionError) as e:
54 logger.warning(f'Cannot read HMAC secret ({e}), regenerating')
56 # Generate new secret
57 secret = os.urandom(32).hex()
58 try:
59 os.makedirs(os.path.dirname(_HMAC_SECRET_PATH), exist_ok=True)
60 with open(_HMAC_SECRET_PATH, 'w') as f:
61 f.write(secret)
62 # Restrict permissions (owner read/write only)
63 try:
64 import stat
65 os.chmod(_HMAC_SECRET_PATH, stat.S_IRUSR | stat.S_IWUSR)
66 except (OSError, NotImplementedError):
67 pass # Windows doesn't support POSIX chmod the same way
68 logger.info(f'Generated per-node HMAC secret at {_HMAC_SECRET_PATH}')
69 except (OSError, PermissionError) as e:
70 logger.warning(f'Cannot persist HMAC secret ({e}), using ephemeral')
72 return secret
75# Cache the secret at module load
76_NODE_HMAC_SECRET: str = ''
79def _get_hmac_secret() -> str:
80 """Return the per-node HMAC secret (lazy-loaded, cached)."""
81 global _NODE_HMAC_SECRET
82 if not _NODE_HMAC_SECRET:
83 _NODE_HMAC_SECRET = _load_or_create_hmac_secret()
84 return _NODE_HMAC_SECRET
87def _sign_delta(delta_dict):
88 """Sign a federation delta with per-node HMAC-SHA256 secret.
90 Uses the persistent per-node secret from agent_data/.hmac_secret
91 (G8 fix — replaces the old HART_NODE_KEY env var / Ed25519 public
92 key fallback which was a hardcoded/default key vulnerability).
93 """
94 node_key = _get_hmac_secret()
95 if not node_key:
96 logger.error('HMAC secret unavailable — delta UNSIGNED')
97 return delta_dict
98 # Work on a copy without any existing hmac_signature
99 to_sign = {k: v for k, v in delta_dict.items() if k != 'hmac_signature'}
100 payload = json.dumps(to_sign, sort_keys=True).encode()
101 sig = hmac.new(node_key.encode(), payload, hashlib.sha256).hexdigest()
102 delta_dict['hmac_signature'] = sig
103 return delta_dict
106def _verify_delta_signature(delta_dict):
107 """Verify a received federation delta's HMAC-SHA256 signature.
109 G8: For verification we need the sender's HMAC secret, which is
110 exchanged during federation handshake (signed by node's Ed25519 key).
111 We check:
112 1. Our own per-node secret (for self-originated deltas)
113 2. Sender's Ed25519-signed HMAC public (from handshake cache)
114 3. Legacy: sender's Ed25519 public key as fallback
115 """
116 sig = delta_dict.get('hmac_signature', '')
117 if not sig:
118 return False
119 to_verify = {k: v for k, v in delta_dict.items() if k != 'hmac_signature'}
120 payload = json.dumps(to_verify, sort_keys=True).encode()
122 # 1. Try our own per-node HMAC secret (self-test / same-node)
123 our_secret = _get_hmac_secret()
124 if our_secret:
125 expected = hmac.new(our_secret.encode(), payload, hashlib.sha256).hexdigest()
126 if hmac.compare_digest(sig, expected):
127 return True
129 # 2. Try peer's exchanged HMAC secret from handshake cache
130 sender_node_id = delta_dict.get('node_id', '')
131 if sender_node_id:
132 peer_secret = _get_peer_hmac_secret(sender_node_id)
133 if peer_secret:
134 expected = hmac.new(peer_secret.encode(), payload, hashlib.sha256).hexdigest()
135 if hmac.compare_digest(sig, expected):
136 return True
138 # 3. Legacy fallback: sender used their Ed25519 public key as HMAC key
139 sender_pubkey = delta_dict.get('public_key', '')
140 if sender_pubkey:
141 expected = hmac.new(sender_pubkey.encode(), payload, hashlib.sha256).hexdigest()
142 if hmac.compare_digest(sig, expected):
143 return True
145 return False
148# ── Peer HMAC secret exchange cache ──
149_peer_hmac_secrets: Dict[str, str] = {}
150_peer_hmac_lock = threading.Lock()
153def register_peer_hmac_secret(node_id: str, secret: str):
154 """Store a peer's HMAC secret received during federation handshake."""
155 with _peer_hmac_lock:
156 _peer_hmac_secrets[node_id] = secret
159def _get_peer_hmac_secret(node_id: str) -> str:
160 """Retrieve a peer's HMAC secret from the handshake cache."""
161 with _peer_hmac_lock:
162 return _peer_hmac_secrets.get(node_id, '')
165def get_hmac_secret_for_handshake() -> str:
166 """Return our HMAC secret for federation handshake exchange.
168 The caller (federation handshake) should sign this with the node's
169 Ed25519 key before transmitting to peers.
170 """
171 return _get_hmac_secret()
174class FederatedAggregator:
175 """Periodic federated learning delta aggregation via gossip.
177 Singleton via get_federated_aggregator(). tick() is called by AgentDaemon.
178 """
180 # Alarm on N consecutive failing epochs. Below this, ticks log at
181 # .exception but federation keeps trying — one bad epoch is normal
182 # (node churn, transient network, peer restart). At/above this,
183 # log.error + consecutive_failure counter flips so a dashboard or
184 # operator sees the sustained outage.
185 _CONSECUTIVE_FAILURE_ALARM: int = 3
187 def __init__(self):
188 self._lock = threading.Lock()
189 self._peer_deltas: Dict[str, dict] = {} # node_id → latest delta
190 self._local_delta: Optional[dict] = None
191 self._epoch = 0
192 self._convergence_history: List[float] = []
193 self._last_aggregated: Optional[dict] = None
194 # Consecutive-tick failure counter — reset on any success. Used
195 # by the tick() silent-fail guard to distinguish transient from
196 # sustained outages.
197 self._consecutive_tick_failures: int = 0
198 # Last exception seen per channel, exposed via get_status() so
199 # dashboards can show "federation unhealthy: <reason>" instead
200 # of an opaque stall.
201 self._last_tick_error: Optional[str] = None
202 # Exponential backoff for unreachable federation peers
203 from core.circuit_breaker import PeerBackoff
204 self._peer_backoff = PeerBackoff(initial=10, maximum=300)
205 # Embedding delta channel (Phase 1 gradient sync)
206 self._embedding_lock = threading.Lock()
207 self._embedding_deltas: Dict[str, dict] = {} # node_id → compressed delta
208 self._embedding_epoch = 0
209 self._last_embedding_aggregated: Optional[dict] = None
210 # Model lifecycle delta channel (dynamic load/unload intelligence)
211 self._lifecycle_lock = threading.Lock()
212 self._lifecycle_deltas: Dict[str, dict] = {} # node_id → model usage stats
213 self._last_lifecycle_aggregated: Optional[dict] = None
214 # Resonance tuning delta channel (personality tuning across nodes)
215 self._resonance_lock = threading.Lock()
216 self._resonance_deltas: Dict[str, dict] = {} # node_id → anonymized tuning stats
217 self._resonance_epoch = 0
218 self._last_resonance_aggregated: Optional[dict] = None
219 # Recipe sharing channel (trained task intelligence)
220 self._recipe_lock = threading.Lock()
221 self._recipe_deltas: Dict[str, dict] = {} # node_id → recipe catalog
222 self._last_recipe_aggregated: Optional[dict] = None
223 # EventBus counters (fed by real-time events)
224 self._event_counters_lock = threading.Lock()
225 self._event_counters: Dict[str, int] = {}
227 # Subscribe to EventBus (if platform is bootstrapped)
228 self._subscribe_to_eventbus()
230 def tick(self) -> dict:
231 """Full cycle: extract → broadcast → aggregate → apply → track.
233 Failure handling:
234 * One failing epoch is logged at .exception (full traceback)
235 so operators can correlate with peer-side errors.
236 * ``_consecutive_tick_failures`` tracks the run of failures;
237 any successful outer try-body resets it. When the count
238 hits ``_CONSECUTIVE_FAILURE_ALARM`` (3 by default) we flip
239 to log.error level and stamp ``result['alarm']`` so any
240 dashboard / metric scraper sees the sustained outage.
241 * The tick method itself always returns — callers (AgentDaemon
242 tick loop) must be able to skip one bad epoch and try again.
243 """
244 self._peer_backoff.prune_expired()
245 # Determine local node tier once per tick for structured logs.
246 try:
247 from security.key_delegation import get_node_tier
248 node_tier = get_node_tier()
249 except Exception:
250 node_tier = 'unknown'
252 result = {'epoch': self._epoch, 'aggregated': False}
253 try:
254 self._local_delta = self.extract_local_delta()
255 if self._local_delta:
256 self.broadcast_delta(self._local_delta)
258 aggregated = self.aggregate()
259 if aggregated:
260 self.apply_aggregated(aggregated)
261 convergence = self.track_convergence()
262 self._epoch += 1
263 result.update({
264 'aggregated': True,
265 'epoch': self._epoch,
266 'convergence': convergence,
267 'peer_count': len(self._peer_deltas),
268 })
270 # Embedding channel tick (Phase 1 gradient sync)
271 embedding_result = self.embedding_tick()
272 if embedding_result.get('aggregated'):
273 result['embedding'] = embedding_result
275 # Resonance channel tick (personality tuning across nodes)
276 resonance_result = self.resonance_tick()
277 if resonance_result.get('aggregated'):
278 result['resonance'] = resonance_result
280 # Success — reset the consecutive-failure window.
281 if self._consecutive_tick_failures:
282 logger.info(
283 f"[FederatedAggregator] tick recovered after "
284 f"{self._consecutive_tick_failures} consecutive failures "
285 f"(epoch={self._epoch}, tier={node_tier})")
286 self._consecutive_tick_failures = 0
287 self._last_tick_error = None
288 except Exception as e:
289 self._consecutive_tick_failures += 1
290 self._last_tick_error = f"{type(e).__name__}: {e}"
291 consecutive = self._consecutive_tick_failures
292 result['error'] = str(e)
293 result['consecutive_failures'] = consecutive
295 # First N-1 failures: .exception (with traceback) at warning
296 # level; >= N: .error + alarm flag. Either way, federation
297 # keeps trying on the next tick.
298 if consecutive >= self._CONSECUTIVE_FAILURE_ALARM:
299 result['alarm'] = True
300 logger.error(
301 f"[FederatedAggregator] ALARM: {consecutive} consecutive "
302 f"federation ticks failed (epoch={self._epoch}, "
303 f"tier={node_tier}): {e}",
304 exc_info=True)
305 else:
306 logger.exception(
307 f"[FederatedAggregator] tick failed "
308 f"(consecutive={consecutive}, epoch={self._epoch}, "
309 f"tier={node_tier}): {e}")
310 return result
312 def extract_local_delta(self) -> Optional[dict]:
313 """Pull learning metrics from WorldModelBridge + HiveMind."""
314 try:
315 from .world_model_bridge import get_world_model_bridge
316 bridge = get_world_model_bridge()
317 stats = bridge.get_stats()
318 learning_stats = bridge.get_learning_stats()
320 # Get node identity for signing
321 node_id = ''
322 public_key = ''
323 try:
324 from security.node_integrity import get_node_identity
325 identity = get_node_identity()
326 node_id = identity.get('node_id', '')
327 public_key = identity.get('public_key', '')
328 except Exception:
329 pass
331 # Get guardrail hash
332 guardrail_hash = ''
333 try:
334 from security.hive_guardrails import compute_guardrail_hash
335 guardrail_hash = compute_guardrail_hash()
336 except Exception:
337 pass
339 # Get capability tier
340 capability_tier = 'standard'
341 try:
342 from security.system_requirements import get_tier_name
343 capability_tier = get_tier_name()
344 except Exception:
345 pass
347 # Get contribution score
348 contribution_score = 0.0
349 try:
350 from integrations.social.models import get_db, PeerNode
351 db = get_db()
352 try:
353 node = db.query(PeerNode).filter_by(
354 node_id=node_id).first()
355 if node:
356 contribution_score = getattr(
357 node, 'contribution_score', 0.0) or 0.0
358 finally:
359 db.close()
360 except Exception:
361 pass
363 # Build delta
364 hivemind_stats = learning_stats.get('hivemind', {})
365 bridge_stats = learning_stats.get('bridge', {})
367 delta = {
368 'version': DELTA_VERSION,
369 'node_id': node_id,
370 'public_key': public_key,
371 'guardrail_hash': guardrail_hash,
372 'timestamp': time.time(),
373 'experience_stats': {
374 'total_recorded': bridge_stats.get('total_recorded', 0),
375 'total_flushed': bridge_stats.get('total_flushed', 0),
376 'flush_rate': (
377 bridge_stats.get('total_flushed', 0) /
378 max(1, bridge_stats.get('total_recorded', 1))
379 ),
380 },
381 'ralt_stats': {
382 'skills_distributed': bridge_stats.get(
383 'total_skills_distributed', 0),
384 'skills_blocked': bridge_stats.get(
385 'total_skills_blocked', 0),
386 'acceptance_rate': (
387 bridge_stats.get('total_skills_distributed', 0) /
388 max(1, bridge_stats.get('total_skills_distributed', 0) +
389 bridge_stats.get('total_skills_blocked', 0))
390 ),
391 },
392 'hivemind_state': {
393 'agent_count': hivemind_stats.get('agent_count', 0),
394 'total_queries': bridge_stats.get(
395 'total_hivemind_queries', 0),
396 'avg_fusion_latency_ms': hivemind_stats.get(
397 'avg_fusion_latency_ms', 0),
398 },
399 'quality_metrics': {
400 'correction_density': bridge_stats.get(
401 'total_corrections', 0),
402 'success_rate': 0.0,
403 'goal_throughput': 0,
404 },
405 'benchmark_results': self._get_benchmark_results(),
406 'capability_tier': capability_tier,
407 'contribution_score': contribution_score,
408 'event_counters': self.get_event_counters(),
409 }
411 # Sign the delta
412 try:
413 from security.node_integrity import sign_json_payload
414 delta['signature'] = sign_json_payload(delta)
415 except Exception:
416 delta['signature'] = ''
418 return delta
419 except Exception as e:
420 logger.debug(f"Federation extract error: {e}")
421 return None
423 def _get_benchmark_results(self) -> dict:
424 """Pull latest benchmark results if BenchmarkRegistry exists."""
425 results = {}
426 try:
427 from .benchmark_registry import get_benchmark_registry
428 registry = get_benchmark_registry()
429 results = registry.get_latest_results()
430 except Exception:
431 pass
433 # Include coding agent benchmarks for hive tool routing intelligence
434 try:
435 from integrations.coding_agent.benchmark_tracker import get_benchmark_tracker
436 coding_delta = get_benchmark_tracker().export_learning_delta()
437 if coding_delta:
438 results['coding_benchmarks'] = coding_delta.get('coding_benchmarks', {})
439 except Exception:
440 pass
442 return results
444 def broadcast_delta(self, delta: dict):
445 """POST delta to all known active peers.
447 Privacy enforcement: ScopeGuard.check_egress() runs before any data
448 leaves this node. Only FEDERATED-scoped aggregate stats are sent.
449 Raw user data, PII, and secrets are structurally blocked.
450 """
451 # ── Edge privacy gate: block PII / secrets from leaving ──
452 try:
453 from security.edge_privacy import get_scope_guard, PrivacyScope
454 guard = get_scope_guard()
455 tagged_delta = dict(delta, _privacy_scope=PrivacyScope.FEDERATED)
456 allowed, reason = guard.check_egress(
457 tagged_delta, PrivacyScope.FEDERATED,
458 context={'source': 'federation_broadcast'}
459 )
460 if not allowed:
461 logger.warning(f"Federation broadcast blocked by ScopeGuard: {reason}")
462 return
463 except ImportError:
464 pass # edge_privacy not available — proceed (defense in depth below)
466 # Sign the delta with HMAC-SHA256 before broadcasting
467 _sign_delta(delta)
469 # Attach origin attestation so peers can verify we're genuine HART OS
470 try:
471 from security.origin_attestation import get_attestation_for_federation
472 att = get_attestation_for_federation()
473 if att.get('valid'):
474 delta['origin_attestation'] = att['attestation']
475 except Exception:
476 pass
478 try:
479 from integrations.social.models import get_db, PeerNode
480 from core.http_pool import pooled_post
482 db = get_db()
483 try:
484 # Get our own backend port to detect self-connections
485 try:
486 from core.port_registry import get_port
487 _own_port = get_port('backend')
488 except Exception:
489 _own_port = 6777
490 _self_urls = {
491 f'http://localhost:{_own_port}',
492 f'http://127.0.0.1:{_own_port}',
493 f'http://0.0.0.0:{_own_port}',
494 }
496 peers = db.query(PeerNode).filter_by(status='active').all()
497 now = time.time()
498 for peer in peers:
499 if not peer.url or peer.node_id == delta.get('node_id'):
500 continue
501 _peer_url = peer.url.rstrip('/')
502 # Skip our own node (bundled mode has no HTTP listener)
503 if _peer_url in _self_urls:
504 continue
505 if self._peer_backoff.is_backed_off(_peer_url):
506 continue
507 try:
508 url = f"{_peer_url}/api/social/peers/federation-delta"
509 pooled_post(url, json=delta, timeout=5)
510 self._peer_backoff.record_success(_peer_url)
511 except Exception:
512 self._peer_backoff.record_failure(_peer_url)
513 finally:
514 db.close()
515 except Exception as e:
516 logger.debug(f"Federation broadcast error: {e}")
518 def receive_peer_delta(self, delta: dict) -> Tuple[bool, str]:
519 """Validate and store incoming peer delta.
521 Validates: schema version, freshness, Ed25519 signature, guardrail hash.
522 """
523 if not isinstance(delta, dict):
524 return False, 'invalid payload'
526 if delta.get('version') != DELTA_VERSION:
527 return False, f'version mismatch (expected {DELTA_VERSION})'
529 # Freshness check
530 ts = delta.get('timestamp', 0)
531 if abs(time.time() - ts) > DELTA_MAX_AGE_SECONDS:
532 return False, 'delta too old or from the future'
534 # Guardrail hash verification
535 try:
536 from security.hive_guardrails import compute_guardrail_hash
537 local_hash = compute_guardrail_hash()
538 if delta.get('guardrail_hash') and delta['guardrail_hash'] != local_hash:
539 return False, 'guardrail hash mismatch'
540 except ImportError:
541 pass
543 # Ed25519 signature verification — required in hard mode
544 from security.master_key import get_enforcement_mode
545 _enforcement = get_enforcement_mode()
546 sig = delta.get('signature', '')
547 if sig:
548 try:
549 from security.node_integrity import verify_json_signature
550 if not verify_json_signature(delta.get('public_key', ''),
551 delta, sig):
552 return False, 'invalid signature'
553 except ImportError:
554 logger.warning('Ed25519 verification module unavailable')
555 if _enforcement == 'hard':
556 return False, 'Ed25519 module unavailable — cannot verify'
557 except Exception as e:
558 logger.warning(f'Ed25519 signature verification error: {e}')
559 if _enforcement == 'hard':
560 return False, f'signature verification failed: {e}'
561 elif _enforcement == 'hard':
562 return False, 'missing Ed25519 signature (hard enforcement)'
564 # HMAC-SHA256 delta signing verification — required in hard mode
565 if delta.get('hmac_signature'):
566 if not _verify_delta_signature(delta):
567 return False, 'invalid HMAC signature'
568 elif _enforcement == 'hard':
569 return False, 'missing HMAC signature (hard enforcement)'
571 # Origin attestation — reject forks and rebranded builds
572 peer_attestation = delta.get('origin_attestation')
573 if peer_attestation:
574 try:
575 from security.origin_attestation import verify_peer_attestation
576 att_ok, att_msg = verify_peer_attestation(peer_attestation)
577 if not att_ok:
578 return False, f'origin attestation failed: {att_msg}'
579 except ImportError:
580 pass # Origin module not available — accept
582 # Revocation check — master-key-signed network halt via federation
583 revocation = delta.get('revocation')
584 if revocation and isinstance(revocation, dict):
585 rev_sig = revocation.get('master_signature', '')
586 if rev_sig:
587 try:
588 from security.master_key import verify_master_signature
589 rev_payload = {k: v for k, v in revocation.items()
590 if k != 'master_signature'}
591 if verify_master_signature(rev_payload, rev_sig):
592 logger.critical(
593 'REVOCATION received via federation delta — '
594 'tripping circuit breaker: %s',
595 revocation.get('reason', 'no reason'))
596 try:
597 from security.hive_guardrails import HiveCircuitBreaker
598 HiveCircuitBreaker.trip(
599 reason=revocation.get('reason', 'revocation'))
600 except Exception as e:
601 logger.critical(f'Circuit breaker trip failed: {e}')
602 return True, 'revocation accepted'
603 else:
604 logger.warning('Revocation in delta has INVALID '
605 'master signature — ignoring')
606 except ImportError:
607 logger.warning('Cannot verify revocation — '
608 'security modules missing')
610 node_id = delta.get('node_id', '')
611 if not node_id:
612 return False, 'missing node_id'
614 with self._lock:
615 self._peer_deltas[node_id] = delta
617 return True, 'accepted'
619 def aggregate(self) -> Optional[dict]:
620 """Weighted FedAvg across all peer deltas + local delta."""
621 with self._lock:
622 all_deltas = list(self._peer_deltas.values())
623 if self._local_delta:
624 all_deltas.append(self._local_delta)
626 if len(all_deltas) < 1:
627 return None
629 # Equal voice: every node's intelligence counts the same.
630 # Weight by data quality (interactions observed) not hardware tier.
631 # A Raspberry Pi that served 10,000 users has more insight than
632 # a GPU server that served 10. No one entity owns the built
633 # intelligence — everyone is equal for this hive being.
634 weights = []
635 for d in all_deltas:
636 interactions = (
637 d.get('experience_stats', {}).get('total_recorded', 0) +
638 d.get('quality_metrics', {}).get('goal_throughput', 0)
639 )
640 # Weight by log of interactions — diminishing returns prevents
641 # any single high-traffic node from dominating
642 w = math.log1p(max(0, interactions))
643 weights.append(max(1.0, w)) # Floor at 1.0 — every node counts
645 total_weight = sum(weights)
647 # Weighted average of numeric metrics
648 aggregated = {
649 'epoch': self._epoch + 1,
650 'peer_count': len(all_deltas),
651 'timestamp': time.time(),
652 'experience_stats': self._weighted_avg_dict(
653 [d.get('experience_stats', {}) for d in all_deltas], weights, total_weight),
654 'ralt_stats': self._weighted_avg_dict(
655 [d.get('ralt_stats', {}) for d in all_deltas], weights, total_weight),
656 'hivemind_state': self._weighted_avg_dict(
657 [d.get('hivemind_state', {}) for d in all_deltas], weights, total_weight),
658 'quality_metrics': self._weighted_avg_dict(
659 [d.get('quality_metrics', {}) for d in all_deltas], weights, total_weight),
660 }
661 return aggregated
663 def _weighted_avg_dict(self, dicts: list, weights: list,
664 total_weight: float) -> dict:
665 """Compute weighted average of numeric values in list of dicts."""
666 result = {}
667 if not dicts:
668 return result
669 keys = set()
670 for d in dicts:
671 keys.update(d.keys())
672 for key in keys:
673 vals = []
674 ws = []
675 for d, w in zip(dicts, weights):
676 v = d.get(key)
677 if isinstance(v, (int, float)):
678 vals.append(v)
679 ws.append(w)
680 if vals:
681 result[key] = sum(v * w for v, w in zip(vals, ws)) / max(1e-10, sum(ws))
682 return result
684 def apply_aggregated(self, aggregated: dict):
685 """Store aggregated metrics locally for dashboard + benchmark consumption.
687 Single code path: routes through WorldModelBridge.apply_federation_update()
688 which owns storage AND the EventBus emit. This prevents the earlier
689 dead path where apply_aggregated mutated bridge._federation_aggregated
690 directly and emitted 'federation.aggregated' on its own, while the
691 bridge's apply_federation_update() was never called by any caller.
692 """
693 self._last_aggregated = aggregated
694 try:
695 from .world_model_bridge import get_world_model_bridge
696 bridge = get_world_model_bridge()
697 bridge.apply_federation_update(aggregated)
698 except Exception as exc:
699 logger.debug(f"[FederatedAggregator] bridge.apply_federation_update failed: {exc}")
701 # Feed hive-aggregated coding benchmarks back to local tool router
702 coding_data = aggregated.get('benchmark_results', {}).get('coding_benchmarks')
703 if coding_data:
704 try:
705 from integrations.coding_agent.benchmark_tracker import get_benchmark_tracker
706 get_benchmark_tracker().import_hive_delta({'coding_benchmarks': coding_data})
707 except Exception:
708 pass
710 def track_convergence(self) -> float:
711 """Variance-based convergence score across peer deltas.
713 Lower variance = higher convergence. Returns 0.0-1.0.
714 """
715 with self._lock:
716 deltas = list(self._peer_deltas.values())
718 if len(deltas) < 2:
719 score = 1.0
720 else:
721 # Use flush_rate variance as proxy
722 rates = [
723 d.get('experience_stats', {}).get('flush_rate', 0)
724 for d in deltas
725 ]
726 mean_rate = sum(rates) / len(rates)
727 variance = sum((r - mean_rate) ** 2 for r in rates) / len(rates)
728 score = 1.0 / (1.0 + variance * 100)
730 self._convergence_history.append(score)
731 if len(self._convergence_history) > 100:
732 self._convergence_history = self._convergence_history[-100:]
733 return score
735 # ─── Embedding Delta Channel (Phase 1 Gradient Sync) ───
737 def receive_embedding_delta(self, node_id: str, delta: dict):
738 """Store a compressed embedding delta from a peer node."""
739 if not node_id or not isinstance(delta, dict):
740 return
741 with self._embedding_lock:
742 self._embedding_deltas[node_id] = delta
744 def aggregate_embeddings(self) -> Optional[dict]:
745 """Aggregate all embedding deltas using trimmed mean."""
746 with self._embedding_lock:
747 deltas = list(self._embedding_deltas.values())
748 if not deltas:
749 return None
751 try:
752 from .embedding_delta import trimmed_mean_aggregate
753 weights = []
754 for d in deltas:
755 cs = d.get('contribution_score', 1.0)
756 weights.append(max(0.01, cs if isinstance(cs, (int, float)) else 1.0))
758 aggregated = trimmed_mean_aggregate(deltas, weights=weights)
759 self._last_embedding_aggregated = aggregated
760 self._embedding_epoch += 1
761 return aggregated
762 except Exception as e:
763 logger.debug(f"Embedding aggregation error: {e}")
764 return None
766 def embedding_tick(self) -> dict:
767 """Embedding channel tick: aggregate + clear stale deltas."""
768 result = {'embedding_epoch': self._embedding_epoch, 'aggregated': False}
769 try:
770 aggregated = self.aggregate_embeddings()
771 if aggregated:
772 result.update({
773 'aggregated': True,
774 'embedding_epoch': self._embedding_epoch,
775 'peer_count': aggregated.get('peer_count', 0),
776 'outliers_removed': aggregated.get('outliers_removed', 0),
777 })
778 # Clear processed deltas
779 with self._embedding_lock:
780 self._embedding_deltas.clear()
781 except Exception as e:
782 result['error'] = str(e)
783 return result
785 def get_embedding_stats(self) -> dict:
786 """Return embedding sync stats for dashboard."""
787 with self._embedding_lock:
788 pending = len(self._embedding_deltas)
789 return {
790 'embedding_epoch': self._embedding_epoch,
791 'pending_deltas': pending,
792 'last_aggregated': self._last_embedding_aggregated,
793 }
795 # ─── Model Lifecycle Delta Channel ───
797 def receive_lifecycle_delta(self, node_id: str, delta: dict):
798 """Store model usage stats from a peer node."""
799 if not node_id or not isinstance(delta, dict):
800 return
801 with self._lifecycle_lock:
802 self._lifecycle_deltas[node_id] = delta
804 def aggregate_lifecycle(self) -> Optional[dict]:
805 """Aggregate model popularity across all peers.
807 Returns: {popularity: {model_name: 0.0-1.0}, peer_count: int}
808 """
809 with self._lifecycle_lock:
810 deltas = list(self._lifecycle_deltas.values())
811 if not deltas:
812 return self._last_lifecycle_aggregated
814 total_peers = len(deltas)
815 model_counts: Dict[str, int] = {}
816 model_access_rates: Dict[str, List[float]] = {}
818 for d in deltas:
819 for model_name, stats in d.get('models', {}).items():
820 model_counts[model_name] = model_counts.get(model_name, 0) + 1
821 rate = stats.get('access_rate', 0)
822 if isinstance(rate, (int, float)):
823 model_access_rates.setdefault(model_name, []).append(rate)
825 popularity = {}
826 for name, count in model_counts.items():
827 peer_fraction = count / max(1, total_peers)
828 rates = model_access_rates.get(name, [0])
829 avg_rate = sum(rates) / max(1, len(rates))
830 popularity[name] = min(1.0, peer_fraction * (1 + avg_rate))
832 result = {'popularity': popularity, 'peer_count': total_peers}
833 self._last_lifecycle_aggregated = result
834 return result
836 def get_lifecycle_stats(self) -> dict:
837 """Return model lifecycle delta stats for dashboard."""
838 with self._lifecycle_lock:
839 pending = len(self._lifecycle_deltas)
840 return {
841 'pending_deltas': pending,
842 'last_aggregated': self._last_lifecycle_aggregated,
843 }
845 # ─── Resonance Tuning Delta Channel ───
847 def receive_resonance_delta(self, node_id: str, delta: dict):
848 """Store anonymized resonance tuning stats from a peer node."""
849 if not node_id or not isinstance(delta, dict):
850 return
851 with self._resonance_lock:
852 self._resonance_deltas[node_id] = delta
854 def aggregate_resonance(self) -> Optional[dict]:
855 """Aggregate resonance deltas: weighted avg of tuning distributions."""
856 with self._resonance_lock:
857 deltas = list(self._resonance_deltas.values())
858 if not deltas:
859 return None
861 # Weighted by user_count (more users = more representative)
862 weights = []
863 for d in deltas:
864 uc = d.get('user_count', 1)
865 weights.append(max(1.0, float(uc)))
866 total_w = sum(weights)
868 n_dims = len(deltas[0].get('avg_tuning', []))
869 if n_dims == 0:
870 return None
872 avg_tuning = [0.0] * n_dims
873 for d, w in zip(deltas, weights):
874 at = d.get('avg_tuning', [0.5] * n_dims)
875 for i in range(min(n_dims, len(at))):
876 avg_tuning[i] += at[i] * w / total_w
878 result = {
879 'avg_tuning': avg_tuning,
880 'peer_count': len(deltas),
881 'total_users': sum(d.get('user_count', 0) for d in deltas),
882 'total_interactions': sum(d.get('total_interactions', 0) for d in deltas),
883 'timestamp': time.time(),
884 }
885 self._last_resonance_aggregated = result
886 self._resonance_epoch += 1
887 return result
889 def resonance_tick(self) -> dict:
890 """Resonance channel tick: extract local → aggregate → apply → clear."""
891 result = {'resonance_epoch': self._resonance_epoch, 'aggregated': False}
892 try:
893 # Extract local resonance delta
894 try:
895 from core.resonance_tuner import get_resonance_tuner
896 tuner = get_resonance_tuner()
897 local_delta = tuner.export_resonance_delta()
898 if local_delta:
899 # Broadcast to peers (piggyback on existing gossip)
900 self._broadcast_resonance(local_delta)
901 except ImportError:
902 pass
904 aggregated = self.aggregate_resonance()
905 if aggregated:
906 # Apply hive-aggregated tuning to local profiles
907 try:
908 from core.resonance_tuner import get_resonance_tuner
909 get_resonance_tuner().import_hive_resonance(aggregated)
910 except ImportError:
911 pass
913 result.update({
914 'aggregated': True,
915 'resonance_epoch': self._resonance_epoch,
916 'peer_count': aggregated.get('peer_count', 0),
917 'total_users': aggregated.get('total_users', 0),
918 })
919 with self._resonance_lock:
920 self._resonance_deltas.clear()
921 except Exception as e:
922 result['error'] = str(e)
923 return result
925 def _broadcast_resonance(self, delta: dict):
926 """Broadcast resonance delta to peers via gossip."""
927 try:
928 from integrations.social.peer_discovery import gossip
929 gossip.broadcast({
930 'type': 'resonance_delta',
931 'delta': delta,
932 'timestamp': time.time(),
933 })
934 except Exception:
935 pass
937 def get_resonance_stats(self) -> dict:
938 """Return resonance channel stats for dashboard."""
939 with self._resonance_lock:
940 pending = len(self._resonance_deltas)
941 return {
942 'resonance_epoch': self._resonance_epoch,
943 'pending_deltas': pending,
944 'last_aggregated': self._last_resonance_aggregated,
945 }
947 # ─── EventBus Integration ───
949 def _subscribe_to_eventbus(self):
950 """Subscribe to EventBus events so learning signals flow into federation.
952 Events consumed: inference.completed, resonance.tuned, memory.item_added,
953 action_state.changed. Counters are included in the next extract_local_delta().
954 """
955 try:
956 from core.platform.events import emit_event
957 from core.platform.registry import get_registry
958 registry = get_registry()
959 if not registry.has('events'):
960 return
961 bus = registry.get('events')
962 bus.on('inference.completed', self._on_event)
963 bus.on('resonance.tuned', self._on_event)
964 bus.on('memory.item_added', self._on_event)
965 bus.on('action_state.changed', self._on_event)
966 except Exception:
967 pass # Platform not bootstrapped yet — will be wired on next tick
969 def _on_event(self, topic: str, data):
970 """Accumulate event counts for federation delta."""
971 with self._event_counters_lock:
972 self._event_counters[topic] = self._event_counters.get(topic, 0) + 1
974 def get_event_counters(self) -> dict:
975 """Return and reset event counters for inclusion in federation delta."""
976 with self._event_counters_lock:
977 counters = dict(self._event_counters)
978 self._event_counters.clear()
979 return counters
981 # ─── Recipe Sharing Channel ───
983 def receive_recipe_delta(self, node_id: str, delta: dict):
984 """Store recipe catalog summary from a peer node.
986 Delta format: {recipes: [{id, name, action_count, success_rate, reuse_count}]}
987 No proprietary data — just catalog metadata for discovery.
988 """
989 if not node_id or not isinstance(delta, dict):
990 return
992 # Check consent for recipe sharing (best-effort, fail-open)
993 user_id = delta.get('user_id', '')
994 if user_id:
995 try:
996 from integrations.social.consent_service import ConsentService
997 from integrations.social.models import db_session
998 with db_session() as db:
999 if not ConsentService.check_consent(db, user_id, 'public_exposure'):
1000 logger.debug(f"Recipe delta from {node_id} blocked: user {user_id} has not consented")
1001 return
1002 except (ImportError, ValueError, Exception):
1003 pass # consent service unavailable — allow (fail-open for dev)
1005 with self._recipe_lock:
1006 self._recipe_deltas[node_id] = delta
1008 def aggregate_recipes(self) -> Optional[dict]:
1009 """Aggregate recipe catalogs — build hive recipe index.
1011 Every node's recipes are equally discoverable. No node gets priority
1012 in the index regardless of its hardware tier.
1013 """
1014 with self._recipe_lock:
1015 deltas = list(self._recipe_deltas.values())
1016 if not deltas:
1017 return self._last_recipe_aggregated
1019 # Build unified catalog — every recipe listed equally
1020 hive_recipes = {}
1021 for d in deltas:
1022 node_id = d.get('node_id', 'unknown')
1023 for recipe in d.get('recipes', []):
1024 rid = recipe.get('id', '')
1025 if rid:
1026 if rid not in hive_recipes:
1027 hive_recipes[rid] = {
1028 'id': rid,
1029 'name': recipe.get('name', ''),
1030 'action_count': recipe.get('action_count', 0),
1031 'nodes': [],
1032 'total_reuse_count': 0,
1033 'avg_success_rate': 0.0,
1034 }
1035 entry = hive_recipes[rid]
1036 entry['nodes'].append(node_id)
1037 entry['total_reuse_count'] += recipe.get('reuse_count', 0)
1038 # Running average of success rates
1039 n = len(entry['nodes'])
1040 old_avg = entry['avg_success_rate']
1041 new_rate = recipe.get('success_rate', 0.0)
1042 entry['avg_success_rate'] = old_avg + (new_rate - old_avg) / n
1044 result = {
1045 'recipes': list(hive_recipes.values()),
1046 'total_recipes': len(hive_recipes),
1047 'peer_count': len(deltas),
1048 'timestamp': time.time(),
1049 }
1050 self._last_recipe_aggregated = result
1051 return result
1053 def get_recipe_stats(self) -> dict:
1054 """Return recipe sharing stats for dashboard."""
1055 with self._recipe_lock:
1056 pending = len(self._recipe_deltas)
1057 return {
1058 'pending_deltas': pending,
1059 'last_aggregated': self._last_recipe_aggregated,
1060 }
1062 def get_stats(self) -> dict:
1063 """Return federation stats for dashboard."""
1064 with self._lock:
1065 peer_count = len(self._peer_deltas)
1066 stats = {
1067 'epoch': self._epoch,
1068 'peer_count': peer_count,
1069 'convergence': self._convergence_history[-1] if self._convergence_history else 0.0,
1070 'convergence_history': self._convergence_history[-10:],
1071 'last_aggregated': self._last_aggregated,
1072 }
1073 # Include embedding stats
1074 try:
1075 stats['embedding'] = self.get_embedding_stats()
1076 except Exception:
1077 pass
1078 # Include model lifecycle stats
1079 try:
1080 stats['lifecycle'] = self.get_lifecycle_stats()
1081 except Exception:
1082 pass
1083 # Include resonance stats
1084 try:
1085 stats['resonance'] = self.get_resonance_stats()
1086 except Exception:
1087 pass
1088 # Include recipe sharing stats
1089 try:
1090 stats['recipes'] = self.get_recipe_stats()
1091 except Exception:
1092 pass
1093 return stats
1095 # ── Node bootstrapping — help new nodes become better ──
1097 def bootstrap_new_node(self, node_id: str) -> dict:
1098 """Share aggregated learning with a newly joined node.
1100 The flywheel helps every node improve — not just extract compute.
1101 Pre-trusted nodes share:
1102 - Aggregated benchmarks (what tools work best for what tasks)
1103 - Recipe index (trained task patterns for REUSE mode)
1104 - Quality metrics (community-validated heuristics)
1105 - Resonance baseline (federated personality norms)
1107 What is NOT shared:
1108 - Raw user data (EDGE_ONLY — never leaves device)
1109 - PII or secrets (DLP + ScopeGuard blocks)
1110 - Raw weights (only non-interpretable LoRA deltas in Phase 2)
1111 - Individual conversation history
1113 Returns a bootstrap package for the new node.
1114 """
1115 package = {
1116 'type': 'node_bootstrap',
1117 'from_node': '',
1118 'for_node': node_id,
1119 'timestamp': time.time(),
1120 }
1122 # Aggregated benchmarks — what the hive has learned about tool performance
1123 package['benchmarks'] = self._get_benchmark_results()
1125 # Recipe index — trained task patterns (metadata only, not full recipes)
1126 try:
1127 package['recipe_index'] = self.get_recipe_stats()
1128 except Exception:
1129 package['recipe_index'] = {}
1131 # Quality heuristics — community-validated metrics
1132 try:
1133 if self.peer_deltas:
1134 quality = {}
1135 for d in self.peer_deltas.values():
1136 qm = d.get('quality_metrics', {})
1137 for k, v in qm.items():
1138 if isinstance(v, (int, float)):
1139 quality.setdefault(k, []).append(v)
1140 package['quality_baselines'] = {
1141 k: sum(v) / len(v) for k, v in quality.items() if v
1142 }
1143 else:
1144 package['quality_baselines'] = {}
1145 except Exception:
1146 package['quality_baselines'] = {}
1148 # Resonance norms — federated personality baselines (aggregate only)
1149 try:
1150 package['resonance_norms'] = self.get_resonance_stats()
1151 except Exception:
1152 package['resonance_norms'] = {}
1154 # ScopeGuard: verify nothing private leaks in bootstrap
1155 try:
1156 from security.edge_privacy import get_scope_guard, PrivacyScope
1157 guard = get_scope_guard()
1158 tagged = dict(package, _privacy_scope=PrivacyScope.FEDERATED)
1159 allowed, reason = guard.check_egress(
1160 tagged, PrivacyScope.FEDERATED,
1161 context={'source': 'node_bootstrap', 'target_node': node_id}
1162 )
1163 if not allowed:
1164 logger.warning(f"Bootstrap blocked by ScopeGuard: {reason}")
1165 return {'error': reason}
1166 except ImportError:
1167 pass
1169 logger.info(f"Bootstrap package for node {node_id}: "
1170 f"{len(package.get('benchmarks', {}))} benchmarks, "
1171 f"{package.get('recipe_index', {}).get('total_recipes', 0)} recipes")
1172 return package
1175# ─── Singleton ───
1176_aggregator = None
1177_aggregator_lock = threading.Lock()
1180def get_federated_aggregator() -> FederatedAggregator:
1181 global _aggregator
1182 if _aggregator is None:
1183 with _aggregator_lock:
1184 if _aggregator is None:
1185 _aggregator = FederatedAggregator()
1186 return _aggregator