Coverage for integrations / social / peer_discovery.py: 59.5%
887 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Decentralized Gossip Peer Discovery
3Fully decentralized protocol for HevolveBot instances to discover each other.
4No central registry. Peers exchange peer lists via gossip, new nodes propagate automatically.
5"""
6import os
7import uuid
8import time
9import random
10import logging
11import threading
12import requests
13from datetime import datetime, timedelta
15from core.http_pool import pooled_get, pooled_post
17logger = logging.getLogger('hevolve_social')
20def _load_or_create_node_id() -> str:
21 """Persist node_id under platform_paths.get_data_dir() / 'node_id.json'.
23 Returns existing id on subsequent boots so the central side can
24 dedupe joins by node_id. Falls back to a fresh in-memory uuid if
25 the data dir is unwritable (degraded environments such as
26 cx_Freeze read-only mode).
27 """
28 import json
29 try:
30 from core.platform_paths import get_data_dir
31 data_dir = get_data_dir()
32 os.makedirs(data_dir, exist_ok=True)
33 path = os.path.join(data_dir, 'node_id.json')
34 if os.path.exists(path):
35 try:
36 with open(path, 'r', encoding='utf-8') as fh:
37 payload = json.load(fh)
38 nid = payload.get('node_id', '')
39 if nid:
40 return nid
41 except Exception:
42 pass # Corrupt file → regenerate
43 nid = str(uuid.uuid4())
44 try:
45 with open(path, 'w', encoding='utf-8') as fh:
46 json.dump({'node_id': nid, 'created_at': datetime.utcnow().isoformat()}, fh)
47 except Exception:
48 pass # Best-effort persist; in-memory id is still valid for this boot
49 return nid
50 except Exception:
51 return str(uuid.uuid4())
54# ═══════════════════════════════════════════════════════════════════════
55# Bandwidth Profiles - auto-selected by tier, override via env
56# ═══════════════════════════════════════════════════════════════════════
58BANDWIDTH_PROFILES = {
59 'full': {
60 'gossip_interval': 60,
61 'health_interval': 120,
62 'gossip_fanout': 3,
63 'payload_mode': 'json', # Full JSON with all fields
64 'stale_threshold': 300,
65 'dead_threshold': 900,
66 },
67 'constrained': {
68 'gossip_interval': 300,
69 'health_interval': 600,
70 'gossip_fanout': 2,
71 'payload_mode': 'json_compact', # Stripped optional fields
72 'stale_threshold': 900,
73 'dead_threshold': 2700,
74 },
75 'minimal': {
76 'gossip_interval': 900,
77 'health_interval': 1800,
78 'gossip_fanout': 1,
79 'payload_mode': 'msgpack', # ~60% smaller than JSON
80 'stale_threshold': 2700,
81 'dead_threshold': 7200,
82 },
83}
85# Bandwidth profile lookup — handles BOTH classification dimensions:
86# 1. Capability tier (NodeTierLevel): embedded, observer, lite, standard, full, compute_host
87# → describes what this node CAN do based on hardware
88# 2. Topology mode (HEVOLVE_NODE_TIER): flat, regional, central
89# → describes WHERE this node sits in the network hierarchy
90# Lookup order: capability_tier first, then topology mode as fallback
91# (see GossipProtocol.__init__: cap_tier or self.tier)
92_TIER_BANDWIDTH_MAP = {
93 # Capability tiers (from security/system_requirements.py NodeTierLevel)
94 'embedded': 'minimal',
95 'observer': 'constrained',
96 'lite': 'constrained',
97 'standard': 'full',
98 'full': 'full',
99 'compute_host': 'full',
100 # Topology modes (fallback when capability_tier is not yet resolved)
101 'flat': 'full',
102 'regional': 'full',
103 'central': 'full',
104}
106# Compact payload: only essential fields for gossip on constrained links
107_COMPACT_FIELDS = frozenset({
108 'node_id', 'url', 'public_key', 'guardrail_hash', 'code_hash',
109 'signature', 'tier', 'capability_tier', 'timestamp', 'hart_tag',
110})
113class GossipProtocol:
114 """Gossip-based peer discovery for HevolveBot network."""
116 def __init__(self):
117 # Identity — persisted across restarts so the central side can
118 # dedupe joins by node_id. Without persistence, every watchdog
119 # restart of the gossip thread fabricates a fresh uuid and the
120 # central dashboard sees infinite "new node" rows for a single
121 # install (witnessed 2026-04-30: same install logged
122 # node=a7ba8cc9 then node=d016058b in one server.log).
123 self.node_id = _load_or_create_node_id()
124 self.node_name = os.environ.get(
125 'HEVOLVE_NODE_NAME', f'hevolve-{self.node_id[:8]}')
126 from core.port_registry import get_port
127 self.base_url = os.environ.get(
128 'HEVOLVE_BASE_URL', f'http://localhost:{get_port("backend")}').rstrip('/')
129 self.version = '1.0.0'
130 self.started_at = datetime.utcnow()
132 # Hierarchy configuration (needed before bandwidth selection)
133 try:
134 from security.key_delegation import get_node_tier
135 self.tier = get_node_tier()
136 except ImportError:
137 self.tier = 'flat'
139 # Bandwidth profile: auto-select from tier, allow env override
140 self.bandwidth_profile = os.environ.get('HEVOLVE_GOSSIP_BANDWIDTH', '')
141 if not self.bandwidth_profile:
142 # Auto-select from capability tier (from system_requirements)
143 cap_tier = ''
144 try:
145 from security.system_requirements import get_capabilities
146 caps = get_capabilities()
147 if caps:
148 cap_tier = caps.tier.value
149 except Exception:
150 pass
151 self.bandwidth_profile = _TIER_BANDWIDTH_MAP.get(
152 cap_tier or self.tier, 'full')
153 profile = BANDWIDTH_PROFILES.get(self.bandwidth_profile, BANDWIDTH_PROFILES['full'])
155 # Configuration - profile defaults, overridable by env
156 self.gossip_interval = int(os.environ.get(
157 'HEVOLVE_GOSSIP_INTERVAL', str(profile['gossip_interval'])))
158 self.health_interval = int(os.environ.get(
159 'HEVOLVE_HEALTH_INTERVAL', str(profile['health_interval'])))
160 self.stale_threshold = int(os.environ.get(
161 'HEVOLVE_STALE_THRESHOLD', str(profile['stale_threshold'])))
162 self.dead_threshold = int(os.environ.get(
163 'HEVOLVE_DEAD_THRESHOLD', str(profile['dead_threshold'])))
164 self.gossip_fanout = int(os.environ.get(
165 'HEVOLVE_GOSSIP_FANOUT', str(profile['gossip_fanout'])))
166 self.payload_mode = profile['payload_mode']
168 self.central_url = os.environ.get('HEVOLVE_CENTRAL_URL', '').rstrip('/')
169 self.regional_url = os.environ.get('HEVOLVE_REGIONAL_URL', '').rstrip('/')
171 # Parse seed peers — env override + canonical genesis peers.
172 # Genesis peers prevent bootstrap poisoning: even if env var is
173 # compromised, the node always knows at least the real network.
174 # Sourced from `core.superadmins.ALL_CENTRAL_URLS` (primary +
175 # fallback) so gossip can find EITHER the .hevolve.ai central
176 # OR the azurekong.hertzai.com fallback — single source of
177 # truth, no parallel literals that could drift (Gate 4 / DRY).
178 try:
179 from core.superadmins import ALL_CENTRAL_URLS
180 _GENESIS_PEERS = list(ALL_CENTRAL_URLS)
181 except Exception:
182 # Degraded environment fallback (cx_Freeze import chain race
183 # at very-early boot). Mirror the canonical default.
184 _GENESIS_PEERS = [
185 'https://central.hevolve.ai',
186 'https://azurekong.hertzai.com',
187 ]
188 seed_str = os.environ.get('HEVOLVE_SEED_PEERS', '')
189 env_peers = [
190 u.strip().rstrip('/') for u in seed_str.split(',')
191 if u.strip()
192 ]
193 # Merge: env peers first (user-specified), then genesis (always present)
194 seen = set()
195 self.seed_peers = []
196 for url in env_peers + _GENESIS_PEERS:
197 if url not in seen:
198 seen.add(url)
199 self.seed_peers.append(url)
201 # HART node identity (loaded on start)
202 self._hart_tag = ''
204 # State
205 self._running = False
206 self._thread = None
207 self._lock = threading.Lock()
209 # Exponential backoff for unreachable peers
210 from core.circuit_breaker import PeerBackoff
211 self._peer_backoff = PeerBackoff(initial=10, maximum=300)
213 logger.info(
214 f"Gossip bandwidth: {self.bandwidth_profile} "
215 f"(gossip={self.gossip_interval}s, health={self.health_interval}s, "
216 f"fanout={self.gossip_fanout}, payload={self.payload_mode})"
217 )
219 # ─── Peer Backoff (delegates to core.circuit_breaker.PeerBackoff) ───
221 def _is_peer_backed_off(self, peer_url: str) -> bool:
222 return self._peer_backoff.is_backed_off(peer_url)
224 def _record_peer_failure(self, peer_url: str):
225 self._peer_backoff.record_failure(peer_url)
227 def _record_peer_success(self, peer_url: str):
228 self._peer_backoff.record_success(peer_url)
230 # ─── Payload Serialization ───
232 def _gossip_self_info(self):
233 """Return self info appropriate for current bandwidth profile."""
234 info = self._self_info()
235 if self.payload_mode == 'json_compact':
236 return {k: v for k, v in info.items() if k in _COMPACT_FIELDS}
237 return info
239 def _gossip_peer_list(self):
240 """Return peer list appropriate for current bandwidth profile."""
241 peers = self.get_peer_list()
242 if self.payload_mode == 'json_compact':
243 return [{k: v for k, v in p.items() if k in _COMPACT_FIELDS}
244 for p in peers]
245 return peers
247 @staticmethod
248 def _serialize_payload(data) -> bytes:
249 """Serialize payload using msgpack if available, else JSON."""
250 try:
251 import msgpack
252 return msgpack.packb(data, use_bin_type=True)
253 except ImportError:
254 import json
255 return json.dumps(data).encode('utf-8')
257 @staticmethod
258 def _deserialize_payload(raw: bytes):
259 """Deserialize payload from msgpack or JSON."""
260 try:
261 import msgpack
262 return msgpack.unpackb(raw, raw=False)
263 except ImportError:
264 import json
265 return json.loads(raw.decode('utf-8'))
266 except Exception:
267 import json
268 return json.loads(raw.decode('utf-8'))
270 def start(self):
271 """Load peers from DB, announce to seeds/known peers, start background thread."""
272 with self._lock:
273 if self._running:
274 return
275 self._running = True
277 # Generate HART node identity if not yet established
278 self._ensure_hart_identity()
280 # Seed peers into DB
281 self._seed_initial_peers()
283 # Announce self to all known peers (non-blocking)
284 threading.Thread(target=self._announce_to_all, daemon=True).start()
286 # Start gossip background loop
287 self._thread = threading.Thread(target=self._background_loop, daemon=True)
288 self._thread.start()
289 logger.info(f"Gossip started: node={self.node_id[:8]}, "
290 f"name={self.node_name}, hart_tag={self._hart_tag}, "
291 f"seeds={len(self.seed_peers)}, "
292 f"bandwidth={self.bandwidth_profile}")
294 # Start the central report-in loop. Every install — bundled,
295 # Docker, ISO — reports identity to the canonical superadmin
296 # allowlist (core.superadmins.SUPERADMIN_CENTRAL_URLS) so the
297 # superadmin dashboard sees every node that has ever joined.
298 # Offline-tolerant via outbox; cheap when centrals are
299 # unreachable (PeerBackoff handles DNS + connect timeouts).
300 try:
301 from core.superadmin_report import start_background_loop
302 start_background_loop(self._self_info)
303 except Exception as e:
304 logger.debug(f"Superadmin report-in loop not started: {e}")
306 def _ensure_hart_identity(self):
307 """Generate HART node identity on first startup. Like getting an IP address.
309 - Central: picks a unique element → @element
310 - Regional: picks a unique spirit → @central.spirit
311 - Flat: no node tag needed (users get individual tags)
312 """
313 try:
314 from hart_onboarding import generate_node_identity, get_node_identity
316 # Check if already generated
317 existing = get_node_identity()
318 if existing and existing.get('node_tag'):
319 self._hart_tag = existing['node_tag']
320 return
322 # Gather known tags from peer network for collision avoidance
323 known_tags = set()
324 try:
325 peers = self._load_peers_from_db(exclude_dead=True)
326 for p in peers:
327 tag = p.get('hart_tag', '')
328 if tag:
329 known_tags.add(tag)
330 except Exception:
331 pass
333 # Central element comes from env or the central we connect to
334 central_element = os.environ.get('HART_CENTRAL_ELEMENT', '')
336 identity = generate_node_identity(
337 tier=self.tier,
338 central_element=central_element or None,
339 known_tags=known_tags,
340 )
342 self._hart_tag = identity.get('node_tag', '')
343 except Exception as e:
344 logger.debug(f"HART identity generation skipped: {e}")
346 def stop(self):
347 """Stop the gossip background thread."""
348 with self._lock:
349 self._running = False
350 if self._thread:
351 self._thread.join(timeout=10)
353 # ─── Background Loop ───
355 def _background_loop(self):
356 last_gossip = 0
357 last_health = 0
358 last_integrity = 0
359 integrity_interval = int(os.environ.get('HEVOLVE_INTEGRITY_INTERVAL', '300'))
360 while self._running:
361 now = time.time()
362 # Heartbeat to watchdog
363 try:
364 from security.node_watchdog import get_watchdog
365 wd = get_watchdog()
366 if wd:
367 wd.heartbeat('gossip')
368 except Exception:
369 pass
370 try:
371 if now - last_gossip >= self.gossip_interval:
372 self._gossip_round()
373 last_gossip = now
374 if now - last_health >= self.health_interval:
375 self._health_check_round()
376 last_health = now
377 if now - last_integrity >= integrity_interval:
378 self._integrity_round()
379 last_integrity = now
380 except Exception as e:
381 logger.debug(f"Gossip loop error: {e}")
382 time.sleep(5)
384 # ─── Gossip Round ───
386 def _heartbeat(self):
387 """Send heartbeat to watchdog between potentially blocking operations."""
388 try:
389 from security.node_watchdog import get_watchdog
390 wd = get_watchdog()
391 if wd:
392 wd.heartbeat('gossip')
393 except Exception:
394 pass
396 def _gossip_round(self):
397 # Tier-aware gossip: scope targets by tier
398 if self.tier == 'flat':
399 peers = self._load_peers_from_db(exclude_dead=True)
400 else:
401 peers = self._load_peers_by_tier()
403 if not peers:
404 # Retry seeds if we have no peers — limit to 2 seeds max
405 # to avoid blocking for N × 5s when all seeds are unreachable
406 for url in list(self.seed_peers)[:2]:
407 if not self._running:
408 return
409 if self._is_peer_backed_off(url):
410 continue
411 self._announce_to_peer(url)
412 self._heartbeat()
413 return
415 targets = random.sample(peers, min(self.gossip_fanout, len(peers)))
416 for peer in targets:
417 if not self._running:
418 return
419 peer_url = peer['url']
420 if self._is_peer_backed_off(peer_url):
421 continue
422 try:
423 their_peers = self._exchange_with_peer(peer_url)
424 if their_peers:
425 self._merge_peer_list(their_peers)
426 # Backoff is handled inside _exchange_with_peer()
427 except Exception as e:
428 logger.debug(f"Gossip exchange failed with {peer_url}: {e}")
429 self._record_peer_failure(peer_url)
430 self._heartbeat()
432 def _health_check_round(self):
433 self._peer_backoff.prune_expired()
434 from .models import get_db, PeerNode
435 db = get_db()
436 try:
437 peers = db.query(PeerNode).filter(PeerNode.status != 'dead').all()
438 now = datetime.utcnow()
439 for peer in peers:
440 if not self._running:
441 break
442 if peer.node_id == self.node_id:
443 continue
444 reachable = self._ping_peer(peer.url)
445 self._heartbeat()
446 if reachable:
447 peer.last_seen = now
448 peer.status = 'active'
449 else:
450 age = (now - (peer.last_seen or peer.first_seen)).total_seconds()
451 if age > self.dead_threshold:
452 peer.status = 'dead'
453 elif age > self.stale_threshold:
454 peer.status = 'stale'
455 db.commit()
456 # Update contribution scores for active/stale peers
457 try:
458 from .hosting_reward_service import HostingRewardService
459 for peer in peers:
460 if peer.status in ('active', 'stale') and peer.node_id != self.node_id:
461 HostingRewardService.compute_contribution_score(db, peer.node_id)
462 db.commit()
463 except Exception:
464 pass
465 except Exception as e:
466 db.rollback()
467 logger.debug(f"Health check error: {e}")
468 finally:
469 db.close()
471 # ─── Announce ───
473 def _announce_to_all(self):
474 peers = self._load_peers_from_db(exclude_dead=False)
475 urls = set(p['url'] for p in peers)
476 # Always include seed peers — flat-mode installs MUST be allowed
477 # to talk to central.hevolve.ai for the universal peer-join +
478 # central report-in spec (memory:
479 # project_universal_peer_join_central_report.md). The previous
480 # 2026-04 fix dropped seeds entirely in flat mode to suppress
481 # NameResolutionError noise, which silently broke central
482 # registration for every desktop install. PeerBackoff (line ~429)
483 # already absorbs DNS failures with exponential backoff, so we
484 # get the announce attempt without the log spam.
485 urls.update(self.seed_peers)
486 for url in urls:
487 if not self._running:
488 return
489 if url != self.base_url:
490 # Skip peers that are currently backed off (exponential
491 # backoff from PeerBackoff). This prevents unnecessary
492 # DNS lookups for hosts that have been unreachable.
493 if self._is_peer_backed_off(url):
494 continue
495 self._announce_to_peer(url)
496 self._heartbeat()
498 def _announce_to_peer(self, peer_url):
499 try:
500 resp = pooled_post(
501 f"{peer_url}/api/social/peers/announce",
502 json=self._self_info(),
503 timeout=5,
504 )
505 if resp.status_code == 200:
506 self._record_peer_success(peer_url)
507 return True
508 self._record_peer_failure(peer_url)
509 return False
510 except requests.RequestException:
511 self._record_peer_failure(peer_url)
512 return False
514 # ─── Exchange ───
516 def _exchange_with_peer(self, peer_url):
517 try:
518 payload = {
519 'peers': self._gossip_peer_list(),
520 'sender': self._gossip_self_info(),
521 }
523 # Try PeerLink first (direct WebSocket, no HTTP overhead)
524 try:
525 peer_id = self._url_to_node_id(peer_url)
526 if peer_id:
527 from core.peer_link.link_manager import get_link_manager
528 link = get_link_manager().get_link(peer_id)
529 if link:
530 result = link.send('gossip', payload,
531 wait_response=True, timeout=10)
532 if result is not None:
533 get_link_manager().record_http_exchange(peer_id)
534 return result.get('peers', [])
535 except Exception:
536 pass # Fall through to HTTP
538 # HTTP path (original)
539 if self.payload_mode == 'msgpack':
540 try:
541 import msgpack
542 resp = pooled_post(
543 f"{peer_url}/api/social/peers/exchange",
544 data=self._serialize_payload(payload),
545 headers={'Content-Type': 'application/msgpack'},
546 timeout=10,
547 )
548 except ImportError:
549 resp = pooled_post(
550 f"{peer_url}/api/social/peers/exchange",
551 json=payload, timeout=10,
552 )
553 else:
554 resp = pooled_post(
555 f"{peer_url}/api/social/peers/exchange",
556 json=payload, timeout=10,
557 )
558 if resp.status_code == 200:
559 data = resp.json()
560 self._record_peer_success(peer_url)
561 # Record exchange for auto-upgrade to PeerLink
562 try:
563 peer_id = self._url_to_node_id(peer_url)
564 if peer_id:
565 from core.peer_link.link_manager import get_link_manager
566 get_link_manager().record_http_exchange(peer_id)
567 except Exception:
568 pass
569 return data.get('peers', [])
570 self._record_peer_failure(peer_url)
571 except requests.RequestException:
572 self._record_peer_failure(peer_url)
573 return None
575 def _url_to_node_id(self, peer_url: str) -> str:
576 """Look up node_id for a peer URL from the DB."""
577 try:
578 from .models import PeerNode, db_session
579 with db_session() as db:
580 peer = db.query(PeerNode).filter(
581 PeerNode.url == peer_url).first()
582 if peer:
583 return peer.node_id
584 except Exception:
585 pass
586 return ''
588 def _ping_peer(self, peer_url):
589 if self._is_peer_backed_off(peer_url):
590 return False
591 try:
592 resp = pooled_get(f"{peer_url}/api/social/peers/health", timeout=3)
593 if resp.status_code == 200:
594 self._record_peer_success(peer_url)
595 return True
596 self._record_peer_failure(peer_url)
597 return False
598 except requests.RequestException:
599 self._record_peer_failure(peer_url)
600 return False
602 # ─── Handlers (called by Flask endpoints) ───
604 def handle_announce(self, peer_data):
605 """Process an incoming peer announcement. Returns True if peer was new."""
606 from .models import get_db
607 db = get_db()
608 try:
609 is_new = self._merge_peer(db, peer_data)
610 db.commit()
611 if is_new:
612 logger.info(f"New peer discovered: {peer_data.get('name', '')} "
613 f"at {peer_data.get('url', '')}")
614 return is_new
615 except Exception as e:
616 db.rollback()
617 logger.debug(f"Announce handler error: {e}")
618 return False
619 finally:
620 db.close()
622 def broadcast(self, message: dict, targets: list = None) -> int:
623 """Broadcast a message to active peers via gossip.
625 Used by RALT skill distribution and skill queries.
626 Posts to /api/social/peers/broadcast on each target node.
628 Returns number of peers that responded with HTTP 2xx.
630 Audit fix #8 (April 2026): previously this counted every HTTP
631 response as "sent" — even 404s when the peer didn't implement
632 the broadcast endpoint. Callers thought they succeeded when
633 they hadn't. Now only 2xx counts, and non-2xx is recorded as
634 a peer failure so the back-off path kicks in.
635 """
636 peers = self._load_peers_from_db(exclude_dead=True)
637 if targets:
638 target_set = set(targets)
639 peers = [p for p in peers if p.get('node_id') in target_set]
641 sent = 0
642 for peer in peers:
643 url = peer.get('url', '')
644 if not url or peer.get('node_id') == self.node_id:
645 continue
646 if self._is_peer_backed_off(url):
647 continue
648 try:
649 resp = pooled_post(
650 f"{url}/api/social/peers/broadcast",
651 json=message,
652 timeout=5,
653 )
654 # Only 2xx counts as a successful delivery. 4xx means the
655 # peer rejected (rate-limited, missing endpoint, bad
656 # payload); 5xx means the peer errored. Either way the
657 # payload did NOT land, so don't tell callers it did.
658 if 200 <= resp.status_code < 300:
659 self._record_peer_success(url)
660 sent += 1
661 else:
662 self._record_peer_failure(url)
663 logger.debug(
664 f"gossip.broadcast to {url}: HTTP "
665 f"{resp.status_code} (type={message.get('type')})")
666 except requests.RequestException:
667 self._record_peer_failure(url)
668 return sent
670 def handle_exchange(self, their_peers):
671 """Process incoming peer list, return our peer list."""
672 if their_peers:
673 self._merge_peer_list(their_peers)
674 return self.get_peer_list()
676 # ─── Peer List ───
678 def get_peer_list(self):
679 """Return all non-dead peers as dicts, including self."""
680 peers = self._load_peers_from_db(exclude_dead=True)
681 # Include self
682 self_info = self._self_info()
683 if not any(p.get('node_id') == self.node_id for p in peers):
684 peers.append(self_info)
685 return peers
687 def get_health(self):
688 """Return this node's health info for the /health endpoint."""
689 uptime = (datetime.utcnow() - self.started_at).total_seconds()
690 peers = self._load_peers_from_db(exclude_dead=True)
691 return {
692 'node_id': self.node_id,
693 'name': self.node_name,
694 'version': self.version,
695 'uptime_seconds': int(uptime),
696 'peer_count': len(peers),
697 'agent_count': self._get_count('agent'),
698 'post_count': self._get_count('post'),
699 'status': 'healthy',
700 }
702 # ─── Internal Helpers ───
704 def _self_info(self):
705 info = {
706 'node_id': self.node_id,
707 'url': self.base_url,
708 'name': self.node_name,
709 'version': self.version,
710 'agent_count': self._get_count('agent'),
711 'post_count': self._get_count('post'),
712 'timestamp': int(time.time()),
713 'tier': self.tier,
714 'hart_tag': self._hart_tag,
715 }
716 # Add cryptographic identity if available
717 try:
718 from security.node_integrity import get_public_key_hex, compute_code_hash, sign_json_payload
719 info['public_key'] = get_public_key_hex()
720 info['code_hash'] = compute_code_hash()
721 # Include release manifest info if available
722 try:
723 from security.master_key import load_release_manifest
724 manifest = load_release_manifest()
725 if manifest:
726 info['release_version'] = manifest.get('version', '')
727 info['release_manifest_signature'] = manifest.get('master_signature', '')
728 except Exception:
729 pass
730 # Include certificate for regional/central nodes
731 try:
732 from security.key_delegation import load_node_certificate
733 cert = load_node_certificate()
734 if cert:
735 info['certificate'] = cert
736 except Exception:
737 pass
738 info['signature'] = sign_json_payload(info)
739 except Exception:
740 pass
741 # Include X25519 public key for E2E encryption
742 try:
743 from security.channel_encryption import get_x25519_public_hex
744 info['x25519_public'] = get_x25519_public_hex()
745 except Exception:
746 pass
747 # Include guardrail hash for peer verification
748 try:
749 from security.hive_guardrails import get_guardrail_hash
750 info['guardrail_hash'] = get_guardrail_hash()
751 except Exception:
752 pass
753 # Include HART OS capabilities (contribution tier + enabled features)
754 try:
755 from security.system_requirements import get_capabilities
756 caps = get_capabilities()
757 if caps:
758 info['capability_tier'] = caps.tier.value
759 info['enabled_features'] = caps.enabled_features
760 info['hardware_summary'] = {
761 'cpu_cores': caps.hardware.cpu_cores,
762 'ram_gb': caps.hardware.ram_gb,
763 'gpu_vram_gb': caps.hardware.gpu_vram_gb,
764 'disk_free_gb': caps.hardware.disk_free_gb,
765 }
766 except Exception:
767 pass
768 # Advertise idle compute availability for distributed task execution
769 try:
770 from integrations.coding_agent.idle_detection import IdleDetectionService
771 from integrations.social.models import get_db
772 db = get_db()
773 try:
774 idle_stats = IdleDetectionService.get_idle_stats(db)
775 info['idle_compute'] = {
776 'available': idle_stats.get('currently_idle', 0) > 0,
777 'idle_agents': idle_stats.get('currently_idle', 0),
778 'opted_in': idle_stats.get('total_opted_in', 0),
779 }
780 finally:
781 db.close()
782 except Exception:
783 pass
784 # Advertise version info for autonomous upgrade discovery
785 try:
786 from integrations.agent_engine.upgrade_orchestrator import get_upgrade_orchestrator
787 orch = get_upgrade_orchestrator()
788 info['current_version'] = self.version
789 status = orch.get_status()
790 if status.get('version') and status.get('stage') == 'completed':
791 info['available_version'] = status['version']
792 except Exception:
793 pass
794 return info
796 def _seed_initial_peers(self):
797 """Insert seed peers into DB if not already present."""
798 from .models import get_db, PeerNode
799 db = get_db()
800 try:
801 for url in self.seed_peers:
802 existing = db.query(PeerNode).filter(PeerNode.url == url).first()
803 if not existing:
804 seed = PeerNode(
805 node_id=f'seed_{uuid.uuid4().hex[:12]}',
806 url=url, name='seed', version='',
807 status='active',
808 )
809 db.add(seed)
810 # Also ensure self is in DB
811 self_peer = db.query(PeerNode).filter(
812 PeerNode.node_id == self.node_id).first()
813 if not self_peer:
814 self_peer = PeerNode(
815 node_id=self.node_id, url=self.base_url,
816 name=self.node_name, version=self.version,
817 status='active',
818 )
819 db.add(self_peer)
820 db.commit()
821 except Exception as e:
822 db.rollback()
823 logger.debug(f"Seed peers init error: {e}")
824 finally:
825 db.close()
827 def _merge_peer_list(self, peer_list):
828 """Merge a list of peer dicts into the DB."""
829 from .models import get_db
830 db = get_db()
831 try:
832 new_count = 0
833 for p in peer_list:
834 if p.get('node_id') and p.get('node_id') != self.node_id:
835 if self._merge_peer(db, p):
836 new_count += 1
837 if new_count > 0:
838 logger.info(f"Gossip: merged {new_count} new peers")
839 db.commit()
840 except Exception as e:
841 db.rollback()
842 logger.debug(f"Merge peer list error: {e}")
843 finally:
844 db.close()
846 def _merge_peer(self, db, peer_data):
847 """Upsert a single peer into PeerNode table. Returns True if new.
848 Verifies Ed25519 signature if present. Rejects banned nodes."""
849 from .models import PeerNode
850 node_id = peer_data.get('node_id')
851 url = peer_data.get('url', '').rstrip('/')
852 if not node_id or not url or node_id == self.node_id:
853 return False
855 # Sybil protection: max 5 nodes per IP/hostname.
856 # Loopback addresses are exempt - single-user dev installs
857 # naturally accumulate many node_ids on localhost (one per
858 # reboot / data-dir reset / clean-install), and rejecting
859 # them as Sybil is a false positive that floods WARNING logs
860 # AND blocks legitimate self-peer registration during testing.
861 # Real Sybil attacks come from distinct external IPs.
862 try:
863 from urllib.parse import urlparse
864 host = (urlparse(url).hostname or '').lower()
865 _is_loopback = host in (
866 'localhost', '127.0.0.1', '::1', '0.0.0.0',
867 ) or host.startswith('127.')
868 if host and not _is_loopback:
869 from .models import PeerNode
870 same_host_count = db.query(PeerNode).filter(
871 PeerNode.url.contains(host),
872 PeerNode.integrity_status != 'banned',
873 ).count()
874 max_per_ip = int(os.environ.get('HEVOLVE_MAX_PEERS_PER_IP', '5'))
875 if same_host_count >= max_per_ip:
876 logger.warning(f"Sybil limit: {same_host_count} nodes from {host}, rejecting {node_id[:8]}")
877 return False
878 except Exception:
879 pass # URL parsing failed — proceed with other checks
881 # Reject banned nodes
882 existing = db.query(PeerNode).filter(PeerNode.node_id == node_id).first()
883 if existing and existing.integrity_status == 'banned':
884 logger.debug(f"Rejecting banned node: {node_id[:8]}")
885 return False
887 # Verify signature if present (backward-compatible: unsigned peers accepted as 'unverified')
888 signature = peer_data.get('signature')
889 public_key = peer_data.get('public_key')
890 signature_valid = False
891 if signature and public_key:
892 try:
893 from security.node_integrity import verify_json_signature
894 # Build payload without signature for verification
895 payload = {k: v for k, v in peer_data.items() if k != 'signature'}
896 signature_valid = verify_json_signature(public_key, payload, signature)
897 if not signature_valid:
898 logger.warning(f"Invalid signature from node {node_id[:8]} at {url}")
899 return False
900 except ImportError:
901 pass # crypto module not available, accept unsigned
902 except Exception as e:
903 logger.warning(f"Unexpected error verifying signature for {node_id[:8]}: {e}")
904 return False # Reject on unexpected verification errors
906 integrity_status = 'verified' if signature_valid else 'unverified'
908 # Enforcement gate: reject unsigned peers in hard mode
909 if not signature_valid:
910 try:
911 from security.master_key import get_enforcement_mode
912 enforcement = get_enforcement_mode()
913 if enforcement == 'hard':
914 logger.warning(f"Rejecting unsigned peer {node_id[:8]} (enforcement=hard)")
915 return False
916 elif enforcement == 'soft':
917 logger.info(f"Unsigned peer {node_id[:8]} accepted (enforcement=soft)")
918 except ImportError:
919 pass # No enforcement module = dev mode, accept all
921 # Guardrail hash verification: reject peers with different guardrail values
922 peer_guardrail_hash = peer_data.get('guardrail_hash', '')
923 if peer_guardrail_hash:
924 try:
925 from security.hive_guardrails import get_guardrail_hash
926 local_guardrail_hash = get_guardrail_hash()
927 if peer_guardrail_hash != local_guardrail_hash:
928 logger.warning(
929 f"Rejecting peer {node_id[:8]}: guardrail hash mismatch")
930 return False
931 except Exception:
932 pass
934 # Code hash verification: check against release hash registry (multi-version)
935 # then fall back to current manifest
936 master_key_verified = False
937 hash_trusted_source = 'untrusted'
938 peer_code_hash = peer_data.get('code_hash', '')
939 if peer_code_hash:
940 # Priority 1: Release hash registry (supports rolling upgrades)
941 try:
942 from security.release_hash_registry import get_release_hash_registry
943 registry = get_release_hash_registry()
944 if registry.is_known_release_hash(peer_code_hash):
945 master_key_verified = True
946 hash_trusted_source = 'registry'
947 except Exception:
948 pass
950 # Priority 2: Current release manifest (fallback if registry unavailable)
951 if not master_key_verified:
952 try:
953 from security.master_key import load_release_manifest
954 manifest = load_release_manifest()
955 if manifest:
956 expected_hash = manifest.get('code_hash', '')
957 if peer_code_hash == expected_hash:
958 master_key_verified = True
959 hash_trusted_source = 'manifest'
960 except Exception:
961 pass
963 # Enforcement: reject unknown hashes in hard mode
964 if not master_key_verified:
965 try:
966 from security.master_key import get_enforcement_mode
967 enforcement = get_enforcement_mode()
968 if enforcement == 'hard':
969 logger.warning(
970 f"Rejecting peer {node_id[:8]}: unknown code hash "
971 f"{peer_code_hash[:16]}... (enforcement=hard)")
972 return False
973 elif enforcement == 'soft':
974 logger.warning(
975 f"Peer {node_id[:8]} unknown code hash "
976 f"(enforcement=soft, allowing)")
977 except Exception:
978 pass
980 # Certificate verification for peers claiming regional/central tier
981 peer_tier = peer_data.get('tier', 'flat')
982 certificate = peer_data.get('certificate')
983 certificate_verified = False
984 if peer_tier in ('regional', 'central') and not certificate:
985 logger.warning(f"Rejecting {node_id[:8]}: {peer_tier} tier requires certificate")
986 return False
987 if peer_tier in ('regional', 'central') and certificate:
988 try:
989 from security.key_delegation import verify_certificate_chain
990 from security.master_key import get_enforcement_mode
991 chain_result = verify_certificate_chain(certificate)
992 certificate_verified = chain_result['valid']
993 enforcement = get_enforcement_mode()
994 if not certificate_verified:
995 # Always reject invalid certificates for regional/central tiers
996 if peer_tier in ('regional', 'central'):
997 logger.warning(f"Rejecting peer {node_id[:8]}: {peer_tier} tier requires valid certificate")
998 return False
999 if enforcement == 'hard':
1000 logger.warning(f"Rejecting peer {node_id[:8]}: invalid certificate "
1001 f"for tier={peer_tier} (enforcement=hard)")
1002 return False
1003 else:
1004 logger.warning(f"Peer {node_id[:8]} has invalid certificate "
1005 f"for tier={peer_tier} (enforcement={enforcement})")
1006 except Exception as e:
1007 logger.debug(f"Certificate verification error for {node_id[:8]}: {e}")
1009 if existing:
1010 existing.last_seen = datetime.utcnow()
1011 existing.url = url
1012 existing.name = peer_data.get('name', existing.name)
1013 existing.version = peer_data.get('version', existing.version)
1014 existing.agent_count = peer_data.get('agent_count', existing.agent_count)
1015 existing.post_count = peer_data.get('post_count', existing.post_count)
1016 # Update integrity fields
1017 if public_key:
1018 existing.public_key = public_key
1019 if peer_data.get('code_hash'):
1020 existing.code_hash = peer_data['code_hash']
1021 if peer_data.get('version'):
1022 existing.code_version = peer_data['version']
1023 if signature_valid:
1024 existing.integrity_status = 'verified'
1025 existing.master_key_verified = master_key_verified
1026 if peer_data.get('release_version'):
1027 existing.release_version = peer_data['release_version']
1028 # Update tier/certificate fields
1029 existing.tier = peer_tier
1030 if certificate:
1031 existing.certificate_json = certificate
1032 existing.certificate_verified = certificate_verified
1033 # Update capability tier from HART OS equilibrium
1034 if peer_data.get('capability_tier'):
1035 existing.capability_tier = peer_data['capability_tier']
1036 if peer_data.get('enabled_features'):
1037 existing.enabled_features_json = peer_data['enabled_features']
1038 # Update X25519 public key for E2E encryption
1039 if peer_data.get('x25519_public'):
1040 existing.x25519_public = peer_data['x25519_public']
1041 if existing.status == 'dead':
1042 # Only resurrect if announcement is recent (not stale gossip)
1043 if (datetime.utcnow() - existing.last_seen).total_seconds() < 60:
1044 existing.status = 'active'
1045 return False
1047 new_peer = PeerNode(
1048 node_id=node_id, url=url,
1049 name=peer_data.get('name', ''),
1050 version=peer_data.get('version', ''),
1051 status='active',
1052 agent_count=peer_data.get('agent_count', 0),
1053 post_count=peer_data.get('post_count', 0),
1054 metadata_json=peer_data.get('metadata', {}),
1055 public_key=public_key or '',
1056 code_hash=peer_data.get('code_hash', ''),
1057 code_version=peer_data.get('version', ''),
1058 integrity_status=integrity_status,
1059 master_key_verified=master_key_verified,
1060 release_version=peer_data.get('release_version', ''),
1061 tier=peer_tier,
1062 certificate_json=certificate,
1063 certificate_verified=certificate_verified,
1064 capability_tier=peer_data.get('capability_tier'),
1065 enabled_features_json=peer_data.get('enabled_features'),
1066 x25519_public=peer_data.get('x25519_public', ''),
1067 )
1068 db.add(new_peer)
1070 # ─── Seamless Mind Merge ───
1071 # Valid peer accepted - auto-federate so minds merge without friction.
1072 # Connection is a breeze; the audit layer handles trust continuously.
1073 threading.Thread(
1074 target=self._auto_federate_peer,
1075 args=(node_id, url),
1076 daemon=True,
1077 ).start()
1079 return True
1081 def _auto_federate_peer(self, peer_node_id: str, peer_url: str):
1082 """Auto-follow a newly accepted peer for seamless mind merge.
1083 Valid peers get instant bidirectional content sharing - no manual step."""
1084 try:
1085 from .models import get_db
1086 from .federation import federation
1087 db = get_db()
1088 try:
1089 # Follow them (we receive their content)
1090 federation.follow_instance(db, self.node_id, peer_node_id, peer_url)
1091 db.commit()
1092 logger.info(f"Mind merge: auto-federated with {peer_node_id[:8]} at {peer_url}")
1093 except Exception as e:
1094 db.rollback()
1095 logger.debug(f"Auto-federation failed for {peer_node_id[:8]}: {e}")
1096 finally:
1097 db.close()
1098 except Exception:
1099 pass
1101 # ─── Integrity Round ───
1103 def _integrity_round(self):
1104 """Periodic integrity check: continuous audit using ALL active nodes.
1105 Every node audits every other node it can reach - not just one random peer.
1106 Valid connections are a breeze; continuous audit is the price of trust."""
1107 # Self-check: verify own code integrity before challenging others
1108 try:
1109 from security.runtime_monitor import is_code_healthy
1110 if not is_code_healthy():
1111 logger.critical("Integrity round: local code tampered, stopping gossip")
1112 self.stop()
1113 return
1114 except Exception:
1115 pass
1117 # Self-check: verify own guardrail integrity
1118 try:
1119 from security.hive_guardrails import verify_guardrail_integrity
1120 if not verify_guardrail_integrity():
1121 logger.critical("Integrity round: guardrail integrity failed, stopping gossip")
1122 self.stop()
1123 return
1124 except Exception:
1125 pass
1127 from .models import get_db, PeerNode
1128 db = get_db()
1129 try:
1130 active_peers = db.query(PeerNode).filter(
1131 PeerNode.status == 'active',
1132 PeerNode.node_id != self.node_id,
1133 PeerNode.integrity_status != 'banned',
1134 ).all()
1136 if active_peers:
1137 from .integrity_service import IntegrityService
1139 # 1. Guardrail audit: re-verify ALL active peers' guardrail hashes.
1140 # This is the continuous audit - every node checks every other node.
1141 for peer in active_peers:
1142 self._audit_peer_guardrails(db, peer)
1143 self._heartbeat()
1145 # 2. Deep challenge: cycle through challenge types across all peers.
1146 # Each peer gets a different challenge type per round (round-robin).
1147 challenge_types = ['agent_count_verify', 'code_hash_check',
1148 'stats_probe', 'guardrail_verify']
1149 for i, peer in enumerate(active_peers):
1150 challenge_type = challenge_types[i % len(challenge_types)]
1151 try:
1152 IntegrityService.create_challenge(
1153 db, self.node_id, peer.node_id,
1154 peer.url, challenge_type)
1155 except Exception as e:
1156 logger.debug(f"Challenge to {peer.node_id[:8]} failed: {e}")
1157 self._heartbeat()
1158 try:
1159 db.commit()
1160 except Exception:
1161 db.rollback()
1163 # 3. Run full fraud detection on ALL active peers
1164 for peer in active_peers:
1165 try:
1166 IntegrityService.detect_impression_anomaly(db, peer.node_id)
1167 IntegrityService.detect_score_jump(db, peer.node_id)
1168 IntegrityService.detect_collusion(db, peer.node_id)
1169 except Exception as e:
1170 logger.debug(f"Fraud detection for {peer.node_id[:8]} failed: {e}")
1171 try:
1172 db.commit()
1173 except Exception:
1174 db.rollback()
1176 # 4. Verify audit compute dominance — no node can outcompute its auditors
1177 try:
1178 from .integrity_service import IntegrityService
1179 for peer in active_peers:
1180 IntegrityService.verify_audit_dominance(db, peer.node_id)
1181 db.commit()
1182 except Exception as e:
1183 db.rollback()
1184 logger.debug(f"Audit dominance check failed: {e}")
1186 # 5. Pull registry ban list if configured
1187 registry_url = os.environ.get('HEVOLVE_REGISTRY_URL', '')
1188 if registry_url:
1189 try:
1190 from .integrity_service import IntegrityService
1191 banned_ids = IntegrityService.check_registry_ban_list(registry_url)
1192 if banned_ids:
1193 for nid in banned_ids:
1194 peer = db.query(PeerNode).filter_by(node_id=nid).first()
1195 if peer and peer.integrity_status != 'banned':
1196 peer.integrity_status = 'banned'
1197 logger.info(f"Node {nid[:8]} banned via registry")
1198 db.commit()
1199 except Exception as e:
1200 logger.debug(f"Registry ban list check failed: {e}")
1202 except Exception as e:
1203 logger.debug(f"Integrity round error: {e}")
1204 finally:
1205 db.close()
1207 def _audit_peer_guardrails(self, db, peer):
1208 """Re-verify a peer's guardrail hash by directly querying it.
1209 This is the continuous audit — every node verifies every other node."""
1210 try:
1211 resp = pooled_get(
1212 f"{peer.url}/api/social/integrity/guardrail-hash",
1213 timeout=5,
1214 )
1215 if resp.status_code != 200:
1216 return # Endpoint might not exist on older nodes
1218 data = resp.json()
1219 peer_hash = data.get('guardrail_hash', '')
1220 if not peer_hash:
1221 return
1223 from security.hive_guardrails import get_guardrail_hash
1224 local_hash = get_guardrail_hash()
1226 if peer_hash != local_hash:
1227 logger.warning(
1228 f"Continuous audit: guardrail drift detected on "
1229 f"{peer.node_id[:8]} — disconnecting")
1230 from .integrity_service import IntegrityService
1231 IntegrityService.increase_fraud_score(
1232 db, peer.node_id, 50.0,
1233 'Guardrail hash drift detected during continuous audit',
1234 {'expected': local_hash[:16], 'got': peer_hash[:16]})
1235 # Severe: unfollow from federation immediately
1236 try:
1237 from .federation import federation
1238 federation.unfollow_instance(db, self.node_id, peer.node_id)
1239 except Exception:
1240 pass
1241 else:
1242 # Peer passed — reward good behavior
1243 from .integrity_service import IntegrityService
1244 IntegrityService.decrease_fraud_score(
1245 db, peer.node_id, 1.0,
1246 'Guardrail audit passed')
1247 except requests.RequestException:
1248 pass # Network issue — will catch it next round
1249 except Exception as e:
1250 logger.debug(f"Guardrail audit for {peer.node_id[:8]} error: {e}")
1252 def _load_peers_by_tier(self):
1253 """Load gossip targets scoped to this node's tier."""
1254 from .models import get_db
1255 db = get_db()
1256 try:
1257 from .hierarchy_service import HierarchyService
1258 return HierarchyService.get_gossip_targets(db, self.node_id, self.tier)
1259 except Exception:
1260 return []
1261 finally:
1262 db.close()
1264 def _load_peers_from_db(self, exclude_dead=True):
1265 from .models import get_db, PeerNode
1266 db = get_db()
1267 try:
1268 q = db.query(PeerNode)
1269 if exclude_dead:
1270 q = q.filter(PeerNode.status != 'dead')
1271 peers = q.all()
1272 return [p.to_dict() for p in peers]
1273 except Exception:
1274 return []
1275 finally:
1276 db.close()
1278 def _get_count(self, what):
1279 try:
1280 from .models import get_db, User, Post
1281 db = get_db()
1282 try:
1283 if what == 'agent':
1284 return db.query(User).filter(User.user_type == 'agent').count()
1285 elif what == 'post':
1286 return db.query(Post).filter(Post.is_deleted == False).count()
1287 return 0
1288 finally:
1289 db.close()
1290 except Exception:
1291 return 0
1294# ═══════════════════════════════════════════════════════════════════════
1295# AutoDiscovery — Zero-Config LAN Peer Finding via UDP Broadcast
1296# ═══════════════════════════════════════════════════════════════════════
1298class AutoDiscovery:
1299 """LAN-based zero-config peer discovery using UDP broadcast.
1301 After boot verification, broadcasts a signed beacon every 30s on UDP port 6780.
1302 Listens for beacons from other nodes on the same network.
1303 Discovered peers are fed into GossipProtocol as additional seeds.
1305 This is ADDITIVE — works alongside seed peers and registry.
1306 """
1308 BEACON_MAGIC = b'HEVOLVE_DISCO_V1'
1309 MAX_PACKET_SIZE = 2048
1311 def __init__(self, gossip_protocol: GossipProtocol,
1312 port: int = None, beacon_interval: int = None):
1313 self._gossip = gossip_protocol
1314 from core.port_registry import get_port
1315 # Legacy env var takes precedence for backward compat
1316 _legacy = os.environ.get('HEVOLVE_DISCOVERY_PORT')
1317 self._port = port or (int(_legacy) if _legacy else get_port('discovery'))
1318 self._beacon_interval = beacon_interval or int(
1319 os.environ.get('HEVOLVE_DISCOVERY_INTERVAL', '30'))
1320 self._running = False
1321 self._send_thread = None
1322 self._recv_thread = None
1323 self._lock = threading.Lock()
1324 self._discovered_nodes: set = set()
1325 self._sock = None
1326 # Cached list of broadcast addresses (one per usable IPv4 NIC).
1327 # Refreshed on start; iterated each beacon send. Replaces the
1328 # naive `<broadcast>` (255.255.255.255) target which on multi-NIC
1329 # Windows boxes (Wi-Fi + Hyper-V + VMware + Docker virtual NICs)
1330 # leaves the box on a single OS-chosen interface — usually a
1331 # virtual subnet, not the physical LAN where peers actually live.
1332 self._broadcast_targets: list = []
1334 def start(self) -> None:
1335 """Start beacon sender and listener threads."""
1336 import socket as _socket
1337 with self._lock:
1338 if self._running:
1339 return
1340 self._running = True
1342 try:
1343 self._sock = _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM)
1344 self._sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_BROADCAST, 1)
1345 self._sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
1346 self._sock.bind(('', self._port))
1347 self._sock.settimeout(2.0)
1348 except OSError as e:
1349 logger.warning(f"AutoDiscovery: cannot bind UDP port {self._port}: {e}")
1350 self._running = False
1351 return
1353 # Enumerate per-NIC broadcast addresses. Always include the
1354 # limited-broadcast 255.255.255.255 as a fallback.
1355 self._broadcast_targets = self._enumerate_broadcast_targets()
1356 logger.info(f"AutoDiscovery broadcast targets: "
1357 f"{', '.join(self._broadcast_targets) or '<broadcast>'}")
1359 self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
1360 self._recv_thread.start()
1361 self._send_thread = threading.Thread(target=self._send_loop, daemon=True)
1362 self._send_thread.start()
1363 logger.info(f"AutoDiscovery started on UDP port {self._port} "
1364 f"(interval={self._beacon_interval}s)")
1366 # NIC name patterns that indicate a virtual/tunnel adapter we
1367 # should NOT broadcast onto. Case-insensitive substring match.
1368 # Catches WSL/Hyper-V vSwitch, VMware/VirtualBox host-only adapters,
1369 # Bluetooth PAN, Docker bridges, and Windows loopback.
1370 _VIRTUAL_NIC_HINTS = (
1371 'loopback', 'pseudo', 'bluetooth', 'vethernet', 'wsl',
1372 'hyper-v', 'vmware', 'virtualbox', 'vbox', 'docker',
1373 'tap-', 'tun', 'npcap',
1374 )
1376 @staticmethod
1377 def _derive_broadcast(addr: str, netmask: str) -> str:
1378 """Compute IPv4 broadcast = addr | ~netmask. Returns '' on parse failure."""
1379 try:
1380 a = [int(x) for x in addr.split('.')]
1381 m = [int(x) for x in netmask.split('.')]
1382 if len(a) != 4 or len(m) != 4:
1383 return ''
1384 bcast = [(a[i] | (~m[i] & 0xFF)) for i in range(4)]
1385 return '.'.join(str(b) for b in bcast)
1386 except Exception:
1387 return ''
1389 def _enumerate_broadcast_targets(self) -> list:
1390 """Return one broadcast address per usable IPv4 NIC.
1392 On Windows, ``sock.sendto((b'…', '<broadcast>', port))`` only
1393 traverses the OS-chosen default-route interface. On boxes with
1394 multiple physical/virtual NICs this is roulette — the beacon
1395 often leaves on a virtual NIC the LAN peers aren't on.
1397 Implementation notes:
1398 - psutil returns ``snic.broadcast = None`` on Windows even for
1399 NICs with valid IPv4 addresses, so we derive broadcast from
1400 ``address | ~netmask`` ourselves.
1401 - We skip virtual / tunnel NICs by name (Bluetooth, vEthernet,
1402 WSL, Hyper-V, VMware, Docker, loopback) so a beacon never
1403 leaks into a virtual subnet our LAN peers aren't on.
1404 - Fallback: if no usable NIC is found, return the limited
1405 broadcast so degraded environments still emit something.
1406 """
1407 try:
1408 import psutil
1409 except ImportError:
1410 return ['255.255.255.255']
1411 targets = []
1412 try:
1413 stats = {}
1414 try:
1415 stats = psutil.net_if_stats()
1416 except Exception:
1417 pass
1418 for nic_name, addrs in psutil.net_if_addrs().items():
1419 # Skip virtual/tunnel NICs by name pattern.
1420 lower_name = nic_name.lower()
1421 if any(hint in lower_name for hint in self._VIRTUAL_NIC_HINTS):
1422 continue
1423 # Skip if the NIC is down (when stats are available).
1424 nic_stat = stats.get(nic_name)
1425 if nic_stat is not None and not nic_stat.isup:
1426 continue
1427 for snic in addrs:
1428 if getattr(snic, 'family', None) is None:
1429 continue
1430 fam_val = int(snic.family) if hasattr(snic.family, 'value') else snic.family
1431 if fam_val != 2: # AF_INET
1432 continue
1433 addr = snic.address or ''
1434 netmask = snic.netmask or ''
1435 bcast = snic.broadcast or ''
1436 # Skip loopback (127.x) and APIPA (169.254.x).
1437 if addr.startswith('127.') or addr.startswith('169.254.'):
1438 continue
1439 # Derive broadcast on Windows (psutil leaves it None).
1440 if not bcast and netmask:
1441 bcast = self._derive_broadcast(addr, netmask)
1442 if not bcast:
1443 continue
1444 if bcast in ('0.0.0.0', '255.255.255.255'):
1445 continue # Treat as "no useful broadcast"
1446 if bcast not in targets:
1447 targets.append(bcast)
1448 except Exception as e:
1449 logger.debug(f"AutoDiscovery NIC enumeration error: {e}")
1450 # Always keep limited broadcast as a final fallback so a host
1451 # without psutil-readable NICs (rare degraded environments) still
1452 # gets at least one outbound packet.
1453 if '255.255.255.255' not in targets:
1454 targets.append('255.255.255.255')
1455 return targets
1457 def stop(self) -> None:
1458 """Stop discovery threads and close socket."""
1459 with self._lock:
1460 self._running = False
1461 if self._sock:
1462 try:
1463 self._sock.close()
1464 except Exception:
1465 pass
1467 def _build_beacon(self) -> bytes:
1468 """Build a signed beacon packet: MAGIC + JSON payload."""
1469 import json as _json
1470 payload = {
1471 'type': 'hevolve-discovery',
1472 'node_id': self._gossip.node_id,
1473 'url': self._gossip.base_url,
1474 'name': self._gossip.node_name,
1475 'version': self._gossip.version,
1476 'tier': self._gossip.tier,
1477 'timestamp': int(time.time()),
1478 }
1479 try:
1480 from security.hive_guardrails import get_guardrail_hash
1481 payload['guardrail_hash'] = get_guardrail_hash()
1482 except Exception:
1483 pass
1484 try:
1485 from security.node_integrity import (
1486 get_public_key_hex, compute_code_hash, sign_json_payload,
1487 )
1488 payload['public_key'] = get_public_key_hex()
1489 payload['code_hash'] = compute_code_hash()
1490 except Exception:
1491 pass
1492 # Include release version if manifest available
1493 try:
1494 from security.master_key import load_release_manifest
1495 manifest = load_release_manifest()
1496 if manifest:
1497 payload['release_version'] = manifest.get('version', '')
1498 except Exception:
1499 pass
1500 # Include X25519 public key for E2E encryption
1501 try:
1502 from security.channel_encryption import get_x25519_public_hex
1503 payload['x25519_public'] = get_x25519_public_hex()
1504 except Exception:
1505 pass
1506 # Include robot capabilities for fleet dispatch
1507 try:
1508 from integrations.robotics.capability_advertiser import (
1509 get_capability_advertiser,
1510 )
1511 adv = get_capability_advertiser()
1512 payload['robot_capabilities'] = adv.get_gossip_payload()
1513 except Exception:
1514 pass
1515 try:
1516 from security.node_integrity import sign_json_payload
1517 payload['signature'] = sign_json_payload(payload)
1518 except Exception:
1519 pass
1521 json_bytes = _json.dumps(payload, separators=(',', ':')).encode('utf-8')
1522 return self.BEACON_MAGIC + json_bytes
1524 def _parse_beacon(self, data: bytes) -> dict:
1525 """Parse and verify a beacon packet. Returns payload dict or empty dict."""
1526 import json as _json
1527 if not data.startswith(self.BEACON_MAGIC):
1528 return {}
1529 try:
1530 json_bytes = data[len(self.BEACON_MAGIC):]
1531 payload = _json.loads(json_bytes.decode('utf-8'))
1532 except (ValueError, UnicodeDecodeError):
1533 return {}
1535 if payload.get('type') != 'hevolve-discovery':
1536 return {}
1537 if payload.get('node_id') == self._gossip.node_id:
1538 return {}
1540 # Verify guardrail hash
1541 peer_hash = payload.get('guardrail_hash', '')
1542 if peer_hash:
1543 try:
1544 from security.hive_guardrails import get_guardrail_hash
1545 if peer_hash != get_guardrail_hash():
1546 logger.debug(f"AutoDiscovery: rejecting beacon from "
1547 f"{payload.get('node_id', '?')[:8]}: guardrail mismatch")
1548 return {}
1549 except Exception:
1550 pass
1552 # Verify code hash against release hash registry
1553 peer_code_hash = payload.get('code_hash', '')
1554 if peer_code_hash:
1555 try:
1556 from security.release_hash_registry import get_release_hash_registry
1557 from security.master_key import get_enforcement_mode
1558 registry = get_release_hash_registry()
1559 if not registry.is_known_release_hash(peer_code_hash):
1560 enforcement = get_enforcement_mode()
1561 if enforcement == 'hard':
1562 logger.warning(
1563 f"AutoDiscovery: rejecting beacon from "
1564 f"{payload.get('node_id', '?')[:8]}: "
1565 f"unknown code hash {peer_code_hash[:16]}...")
1566 return {}
1567 elif enforcement in ('soft', 'warn'):
1568 logger.info(
1569 f"AutoDiscovery: unknown code hash from "
1570 f"{payload.get('node_id', '?')[:8]} "
1571 f"(enforcement={enforcement})")
1572 except Exception:
1573 pass
1575 # Verify Ed25519 signature
1576 sig = payload.get('signature')
1577 pubkey = payload.get('public_key')
1578 if sig and pubkey:
1579 try:
1580 from security.node_integrity import verify_json_signature
1581 clean = {k: v for k, v in payload.items() if k != 'signature'}
1582 if not verify_json_signature(pubkey, clean, sig):
1583 logger.warning(f"AutoDiscovery: invalid signature from "
1584 f"{payload.get('node_id', '?')[:8]}")
1585 return {}
1586 except Exception:
1587 pass
1589 # Reject stale beacons (> 5 minutes old)
1590 ts = payload.get('timestamp', 0)
1591 if abs(time.time() - ts) > 300:
1592 return {}
1594 return payload
1596 def _send_loop(self) -> None:
1597 """Periodically broadcast beacon on LAN.
1599 Sleeps via ``NodeWatchdog.sleep_with_heartbeat`` so an interval
1600 longer than the watchdog's frozen threshold (30s × 10 = 300s by
1601 default) can't age the heartbeat out mid-sleep. The default
1602 beacon interval is well below the threshold, but running with
1603 HEVOLVE_BEACON_INTERVAL=600 (or similar operator overrides) used
1604 to trigger the restart cascade documented in the 2026-04-11
1605 incident.
1606 """
1607 while self._running:
1608 try:
1609 beacon = self._build_beacon()
1610 # Send the beacon to every usable per-NIC broadcast
1611 # address (Win11 multi-NIC: Wi-Fi + Hyper-V + VMware +
1612 # Docker virtuals). A failure on one NIC must not
1613 # abort the round.
1614 targets = self._broadcast_targets or ['255.255.255.255']
1615 for tgt in targets:
1616 try:
1617 self._sock.sendto(beacon, (tgt, self._port))
1618 except Exception as e:
1619 logger.debug(f"AutoDiscovery send to {tgt} failed: {e}")
1620 except Exception as e:
1621 logger.debug(f"AutoDiscovery send error: {e}")
1622 try:
1623 from security.node_watchdog import get_watchdog
1624 wd = get_watchdog()
1625 if wd is not None:
1626 wd.sleep_with_heartbeat(
1627 'auto_discovery', self._beacon_interval,
1628 stop_check=lambda: not self._running,
1629 )
1630 continue
1631 except Exception:
1632 pass
1633 # Fallback path if watchdog is unavailable: plain sleep +
1634 # best-effort heartbeat. Preserves original behavior.
1635 time.sleep(self._beacon_interval)
1637 def _recv_loop(self) -> None:
1638 """Listen for beacons from other nodes on the network.
1640 The socket has a 2s timeout (set in start()) so recvfrom wakes
1641 regularly and we can heartbeat between calls. The heartbeat is
1642 now emitted on BOTH timeout and successful receipt — the
1643 previous code only heartbeated on timeout, so a node that kept
1644 receiving packets every 2s could still let the heartbeat age
1645 if the recv path itself blocked past the frozen threshold.
1646 """
1647 import socket as _socket
1649 def _wd_heartbeat_safe():
1650 try:
1651 from security.node_watchdog import get_watchdog
1652 wd = get_watchdog()
1653 if wd:
1654 wd.heartbeat('auto_discovery')
1655 except Exception:
1656 pass
1658 while self._running:
1659 try:
1660 data, addr = self._sock.recvfrom(self.MAX_PACKET_SIZE)
1661 except _socket.timeout:
1662 _wd_heartbeat_safe()
1663 continue
1664 except OSError:
1665 if not self._running:
1666 break
1667 continue
1668 # Successful receipt — refresh heartbeat before processing
1669 # the payload, which involves JSON parsing + gossip handoff
1670 # and could itself block for a noticeable fraction of a second.
1671 _wd_heartbeat_safe()
1673 payload = self._parse_beacon(data)
1674 if not payload:
1675 continue
1677 node_id = payload.get('node_id')
1678 if node_id in self._discovered_nodes:
1679 continue
1681 self._discovered_nodes.add(node_id)
1682 url = payload.get('url', '')
1683 logger.info(f"AutoDiscovery: found node "
1684 f"{payload.get('name', node_id[:8])} at {url} via LAN")
1686 # Feed into gossip
1687 try:
1688 self._gossip.handle_announce(payload)
1689 except Exception:
1690 pass
1691 try:
1692 self._gossip._announce_to_peer(url)
1693 except Exception:
1694 pass
1697# Module-level singletons
1698gossip = GossipProtocol()
1699auto_discovery = AutoDiscovery(gossip)
1702def get_peer_discovery() -> GossipProtocol:
1703 """Return the singleton GossipProtocol instance.
1705 Canonical accessor for callers that want the gossip singleton
1706 without importing the module-level binding directly (e.g.
1707 `integrations.agent_engine.compute_borrowing` for compute-offer
1708 broadcast). Same object every call — gossip identity is stable.
1709 """
1710 return gossip
1713def get_auto_discovery() -> AutoDiscovery:
1714 """Return the singleton AutoDiscovery instance.
1716 Canonical accessor used by standalone runners (systemd unit,
1717 NixOS module) so they don't have to instantiate a new
1718 AutoDiscovery with their own gossip — they pick up the same
1719 singleton wired here.
1720 """
1721 return auto_discovery