Coverage for integrations / social / peer_discovery.py: 59.5%

887 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Decentralized Gossip Peer Discovery 

3Fully decentralized protocol for HevolveBot instances to discover each other. 

4No central registry. Peers exchange peer lists via gossip, new nodes propagate automatically. 

5""" 

6import os 

7import uuid 

8import time 

9import random 

10import logging 

11import threading 

12import requests 

13from datetime import datetime, timedelta 

14 

15from core.http_pool import pooled_get, pooled_post 

16 

17logger = logging.getLogger('hevolve_social') 

18 

19 

20def _load_or_create_node_id() -> str: 

21 """Persist node_id under platform_paths.get_data_dir() / 'node_id.json'. 

22 

23 Returns existing id on subsequent boots so the central side can 

24 dedupe joins by node_id. Falls back to a fresh in-memory uuid if 

25 the data dir is unwritable (degraded environments such as 

26 cx_Freeze read-only mode). 

27 """ 

28 import json 

29 try: 

30 from core.platform_paths import get_data_dir 

31 data_dir = get_data_dir() 

32 os.makedirs(data_dir, exist_ok=True) 

33 path = os.path.join(data_dir, 'node_id.json') 

34 if os.path.exists(path): 

35 try: 

36 with open(path, 'r', encoding='utf-8') as fh: 

37 payload = json.load(fh) 

38 nid = payload.get('node_id', '') 

39 if nid: 

40 return nid 

41 except Exception: 

42 pass # Corrupt file → regenerate 

43 nid = str(uuid.uuid4()) 

44 try: 

45 with open(path, 'w', encoding='utf-8') as fh: 

46 json.dump({'node_id': nid, 'created_at': datetime.utcnow().isoformat()}, fh) 

47 except Exception: 

48 pass # Best-effort persist; in-memory id is still valid for this boot 

49 return nid 

50 except Exception: 

51 return str(uuid.uuid4()) 

52 

53 

54# ═══════════════════════════════════════════════════════════════════════ 

55# Bandwidth Profiles - auto-selected by tier, override via env 

56# ═══════════════════════════════════════════════════════════════════════ 

57 

58BANDWIDTH_PROFILES = { 

59 'full': { 

60 'gossip_interval': 60, 

61 'health_interval': 120, 

62 'gossip_fanout': 3, 

63 'payload_mode': 'json', # Full JSON with all fields 

64 'stale_threshold': 300, 

65 'dead_threshold': 900, 

66 }, 

67 'constrained': { 

68 'gossip_interval': 300, 

69 'health_interval': 600, 

70 'gossip_fanout': 2, 

71 'payload_mode': 'json_compact', # Stripped optional fields 

72 'stale_threshold': 900, 

73 'dead_threshold': 2700, 

74 }, 

75 'minimal': { 

76 'gossip_interval': 900, 

77 'health_interval': 1800, 

78 'gossip_fanout': 1, 

79 'payload_mode': 'msgpack', # ~60% smaller than JSON 

80 'stale_threshold': 2700, 

81 'dead_threshold': 7200, 

82 }, 

83} 

84 

85# Bandwidth profile lookup — handles BOTH classification dimensions: 

86# 1. Capability tier (NodeTierLevel): embedded, observer, lite, standard, full, compute_host 

87# → describes what this node CAN do based on hardware 

88# 2. Topology mode (HEVOLVE_NODE_TIER): flat, regional, central 

89# → describes WHERE this node sits in the network hierarchy 

90# Lookup order: capability_tier first, then topology mode as fallback 

91# (see GossipProtocol.__init__: cap_tier or self.tier) 

92_TIER_BANDWIDTH_MAP = { 

93 # Capability tiers (from security/system_requirements.py NodeTierLevel) 

94 'embedded': 'minimal', 

95 'observer': 'constrained', 

96 'lite': 'constrained', 

97 'standard': 'full', 

98 'full': 'full', 

99 'compute_host': 'full', 

100 # Topology modes (fallback when capability_tier is not yet resolved) 

101 'flat': 'full', 

102 'regional': 'full', 

103 'central': 'full', 

104} 

105 

106# Compact payload: only essential fields for gossip on constrained links 

107_COMPACT_FIELDS = frozenset({ 

108 'node_id', 'url', 'public_key', 'guardrail_hash', 'code_hash', 

109 'signature', 'tier', 'capability_tier', 'timestamp', 'hart_tag', 

110}) 

111 

112 

113class GossipProtocol: 

114 """Gossip-based peer discovery for HevolveBot network.""" 

115 

116 def __init__(self): 

117 # Identity — persisted across restarts so the central side can 

118 # dedupe joins by node_id. Without persistence, every watchdog 

119 # restart of the gossip thread fabricates a fresh uuid and the 

120 # central dashboard sees infinite "new node" rows for a single 

121 # install (witnessed 2026-04-30: same install logged 

122 # node=a7ba8cc9 then node=d016058b in one server.log). 

123 self.node_id = _load_or_create_node_id() 

124 self.node_name = os.environ.get( 

125 'HEVOLVE_NODE_NAME', f'hevolve-{self.node_id[:8]}') 

126 from core.port_registry import get_port 

127 self.base_url = os.environ.get( 

128 'HEVOLVE_BASE_URL', f'http://localhost:{get_port("backend")}').rstrip('/') 

129 self.version = '1.0.0' 

130 self.started_at = datetime.utcnow() 

131 

132 # Hierarchy configuration (needed before bandwidth selection) 

133 try: 

134 from security.key_delegation import get_node_tier 

135 self.tier = get_node_tier() 

136 except ImportError: 

137 self.tier = 'flat' 

138 

139 # Bandwidth profile: auto-select from tier, allow env override 

140 self.bandwidth_profile = os.environ.get('HEVOLVE_GOSSIP_BANDWIDTH', '') 

141 if not self.bandwidth_profile: 

142 # Auto-select from capability tier (from system_requirements) 

143 cap_tier = '' 

144 try: 

145 from security.system_requirements import get_capabilities 

146 caps = get_capabilities() 

147 if caps: 

148 cap_tier = caps.tier.value 

149 except Exception: 

150 pass 

151 self.bandwidth_profile = _TIER_BANDWIDTH_MAP.get( 

152 cap_tier or self.tier, 'full') 

153 profile = BANDWIDTH_PROFILES.get(self.bandwidth_profile, BANDWIDTH_PROFILES['full']) 

154 

155 # Configuration - profile defaults, overridable by env 

156 self.gossip_interval = int(os.environ.get( 

157 'HEVOLVE_GOSSIP_INTERVAL', str(profile['gossip_interval']))) 

158 self.health_interval = int(os.environ.get( 

159 'HEVOLVE_HEALTH_INTERVAL', str(profile['health_interval']))) 

160 self.stale_threshold = int(os.environ.get( 

161 'HEVOLVE_STALE_THRESHOLD', str(profile['stale_threshold']))) 

162 self.dead_threshold = int(os.environ.get( 

163 'HEVOLVE_DEAD_THRESHOLD', str(profile['dead_threshold']))) 

164 self.gossip_fanout = int(os.environ.get( 

165 'HEVOLVE_GOSSIP_FANOUT', str(profile['gossip_fanout']))) 

166 self.payload_mode = profile['payload_mode'] 

167 

168 self.central_url = os.environ.get('HEVOLVE_CENTRAL_URL', '').rstrip('/') 

169 self.regional_url = os.environ.get('HEVOLVE_REGIONAL_URL', '').rstrip('/') 

170 

171 # Parse seed peers — env override + canonical genesis peers. 

172 # Genesis peers prevent bootstrap poisoning: even if env var is 

173 # compromised, the node always knows at least the real network. 

174 # Sourced from `core.superadmins.ALL_CENTRAL_URLS` (primary + 

175 # fallback) so gossip can find EITHER the .hevolve.ai central 

176 # OR the azurekong.hertzai.com fallback — single source of 

177 # truth, no parallel literals that could drift (Gate 4 / DRY). 

178 try: 

179 from core.superadmins import ALL_CENTRAL_URLS 

180 _GENESIS_PEERS = list(ALL_CENTRAL_URLS) 

181 except Exception: 

182 # Degraded environment fallback (cx_Freeze import chain race 

183 # at very-early boot). Mirror the canonical default. 

184 _GENESIS_PEERS = [ 

185 'https://central.hevolve.ai', 

186 'https://azurekong.hertzai.com', 

187 ] 

188 seed_str = os.environ.get('HEVOLVE_SEED_PEERS', '') 

189 env_peers = [ 

190 u.strip().rstrip('/') for u in seed_str.split(',') 

191 if u.strip() 

192 ] 

193 # Merge: env peers first (user-specified), then genesis (always present) 

194 seen = set() 

195 self.seed_peers = [] 

196 for url in env_peers + _GENESIS_PEERS: 

197 if url not in seen: 

198 seen.add(url) 

199 self.seed_peers.append(url) 

200 

201 # HART node identity (loaded on start) 

202 self._hart_tag = '' 

203 

204 # State 

205 self._running = False 

206 self._thread = None 

207 self._lock = threading.Lock() 

208 

209 # Exponential backoff for unreachable peers 

210 from core.circuit_breaker import PeerBackoff 

211 self._peer_backoff = PeerBackoff(initial=10, maximum=300) 

212 

213 logger.info( 

214 f"Gossip bandwidth: {self.bandwidth_profile} " 

215 f"(gossip={self.gossip_interval}s, health={self.health_interval}s, " 

216 f"fanout={self.gossip_fanout}, payload={self.payload_mode})" 

217 ) 

218 

219 # ─── Peer Backoff (delegates to core.circuit_breaker.PeerBackoff) ─── 

220 

221 def _is_peer_backed_off(self, peer_url: str) -> bool: 

222 return self._peer_backoff.is_backed_off(peer_url) 

223 

224 def _record_peer_failure(self, peer_url: str): 

225 self._peer_backoff.record_failure(peer_url) 

226 

227 def _record_peer_success(self, peer_url: str): 

228 self._peer_backoff.record_success(peer_url) 

229 

230 # ─── Payload Serialization ─── 

231 

232 def _gossip_self_info(self): 

233 """Return self info appropriate for current bandwidth profile.""" 

234 info = self._self_info() 

235 if self.payload_mode == 'json_compact': 

236 return {k: v for k, v in info.items() if k in _COMPACT_FIELDS} 

237 return info 

238 

239 def _gossip_peer_list(self): 

240 """Return peer list appropriate for current bandwidth profile.""" 

241 peers = self.get_peer_list() 

242 if self.payload_mode == 'json_compact': 

243 return [{k: v for k, v in p.items() if k in _COMPACT_FIELDS} 

244 for p in peers] 

245 return peers 

246 

247 @staticmethod 

248 def _serialize_payload(data) -> bytes: 

249 """Serialize payload using msgpack if available, else JSON.""" 

250 try: 

251 import msgpack 

252 return msgpack.packb(data, use_bin_type=True) 

253 except ImportError: 

254 import json 

255 return json.dumps(data).encode('utf-8') 

256 

257 @staticmethod 

258 def _deserialize_payload(raw: bytes): 

259 """Deserialize payload from msgpack or JSON.""" 

260 try: 

261 import msgpack 

262 return msgpack.unpackb(raw, raw=False) 

263 except ImportError: 

264 import json 

265 return json.loads(raw.decode('utf-8')) 

266 except Exception: 

267 import json 

268 return json.loads(raw.decode('utf-8')) 

269 

270 def start(self): 

271 """Load peers from DB, announce to seeds/known peers, start background thread.""" 

272 with self._lock: 

273 if self._running: 

274 return 

275 self._running = True 

276 

277 # Generate HART node identity if not yet established 

278 self._ensure_hart_identity() 

279 

280 # Seed peers into DB 

281 self._seed_initial_peers() 

282 

283 # Announce self to all known peers (non-blocking) 

284 threading.Thread(target=self._announce_to_all, daemon=True).start() 

285 

286 # Start gossip background loop 

287 self._thread = threading.Thread(target=self._background_loop, daemon=True) 

288 self._thread.start() 

289 logger.info(f"Gossip started: node={self.node_id[:8]}, " 

290 f"name={self.node_name}, hart_tag={self._hart_tag}, " 

291 f"seeds={len(self.seed_peers)}, " 

292 f"bandwidth={self.bandwidth_profile}") 

293 

294 # Start the central report-in loop. Every install — bundled, 

295 # Docker, ISO — reports identity to the canonical superadmin 

296 # allowlist (core.superadmins.SUPERADMIN_CENTRAL_URLS) so the 

297 # superadmin dashboard sees every node that has ever joined. 

298 # Offline-tolerant via outbox; cheap when centrals are 

299 # unreachable (PeerBackoff handles DNS + connect timeouts). 

300 try: 

301 from core.superadmin_report import start_background_loop 

302 start_background_loop(self._self_info) 

303 except Exception as e: 

304 logger.debug(f"Superadmin report-in loop not started: {e}") 

305 

306 def _ensure_hart_identity(self): 

307 """Generate HART node identity on first startup. Like getting an IP address. 

308 

309 - Central: picks a unique element → @element 

310 - Regional: picks a unique spirit → @central.spirit 

311 - Flat: no node tag needed (users get individual tags) 

312 """ 

313 try: 

314 from hart_onboarding import generate_node_identity, get_node_identity 

315 

316 # Check if already generated 

317 existing = get_node_identity() 

318 if existing and existing.get('node_tag'): 

319 self._hart_tag = existing['node_tag'] 

320 return 

321 

322 # Gather known tags from peer network for collision avoidance 

323 known_tags = set() 

324 try: 

325 peers = self._load_peers_from_db(exclude_dead=True) 

326 for p in peers: 

327 tag = p.get('hart_tag', '') 

328 if tag: 

329 known_tags.add(tag) 

330 except Exception: 

331 pass 

332 

333 # Central element comes from env or the central we connect to 

334 central_element = os.environ.get('HART_CENTRAL_ELEMENT', '') 

335 

336 identity = generate_node_identity( 

337 tier=self.tier, 

338 central_element=central_element or None, 

339 known_tags=known_tags, 

340 ) 

341 

342 self._hart_tag = identity.get('node_tag', '') 

343 except Exception as e: 

344 logger.debug(f"HART identity generation skipped: {e}") 

345 

346 def stop(self): 

347 """Stop the gossip background thread.""" 

348 with self._lock: 

349 self._running = False 

350 if self._thread: 

351 self._thread.join(timeout=10) 

352 

353 # ─── Background Loop ─── 

354 

355 def _background_loop(self): 

356 last_gossip = 0 

357 last_health = 0 

358 last_integrity = 0 

359 integrity_interval = int(os.environ.get('HEVOLVE_INTEGRITY_INTERVAL', '300')) 

360 while self._running: 

361 now = time.time() 

362 # Heartbeat to watchdog 

363 try: 

364 from security.node_watchdog import get_watchdog 

365 wd = get_watchdog() 

366 if wd: 

367 wd.heartbeat('gossip') 

368 except Exception: 

369 pass 

370 try: 

371 if now - last_gossip >= self.gossip_interval: 

372 self._gossip_round() 

373 last_gossip = now 

374 if now - last_health >= self.health_interval: 

375 self._health_check_round() 

376 last_health = now 

377 if now - last_integrity >= integrity_interval: 

378 self._integrity_round() 

379 last_integrity = now 

380 except Exception as e: 

381 logger.debug(f"Gossip loop error: {e}") 

382 time.sleep(5) 

383 

384 # ─── Gossip Round ─── 

385 

386 def _heartbeat(self): 

387 """Send heartbeat to watchdog between potentially blocking operations.""" 

388 try: 

389 from security.node_watchdog import get_watchdog 

390 wd = get_watchdog() 

391 if wd: 

392 wd.heartbeat('gossip') 

393 except Exception: 

394 pass 

395 

396 def _gossip_round(self): 

397 # Tier-aware gossip: scope targets by tier 

398 if self.tier == 'flat': 

399 peers = self._load_peers_from_db(exclude_dead=True) 

400 else: 

401 peers = self._load_peers_by_tier() 

402 

403 if not peers: 

404 # Retry seeds if we have no peers — limit to 2 seeds max 

405 # to avoid blocking for N × 5s when all seeds are unreachable 

406 for url in list(self.seed_peers)[:2]: 

407 if not self._running: 

408 return 

409 if self._is_peer_backed_off(url): 

410 continue 

411 self._announce_to_peer(url) 

412 self._heartbeat() 

413 return 

414 

415 targets = random.sample(peers, min(self.gossip_fanout, len(peers))) 

416 for peer in targets: 

417 if not self._running: 

418 return 

419 peer_url = peer['url'] 

420 if self._is_peer_backed_off(peer_url): 

421 continue 

422 try: 

423 their_peers = self._exchange_with_peer(peer_url) 

424 if their_peers: 

425 self._merge_peer_list(their_peers) 

426 # Backoff is handled inside _exchange_with_peer() 

427 except Exception as e: 

428 logger.debug(f"Gossip exchange failed with {peer_url}: {e}") 

429 self._record_peer_failure(peer_url) 

430 self._heartbeat() 

431 

432 def _health_check_round(self): 

433 self._peer_backoff.prune_expired() 

434 from .models import get_db, PeerNode 

435 db = get_db() 

436 try: 

437 peers = db.query(PeerNode).filter(PeerNode.status != 'dead').all() 

438 now = datetime.utcnow() 

439 for peer in peers: 

440 if not self._running: 

441 break 

442 if peer.node_id == self.node_id: 

443 continue 

444 reachable = self._ping_peer(peer.url) 

445 self._heartbeat() 

446 if reachable: 

447 peer.last_seen = now 

448 peer.status = 'active' 

449 else: 

450 age = (now - (peer.last_seen or peer.first_seen)).total_seconds() 

451 if age > self.dead_threshold: 

452 peer.status = 'dead' 

453 elif age > self.stale_threshold: 

454 peer.status = 'stale' 

455 db.commit() 

456 # Update contribution scores for active/stale peers 

457 try: 

458 from .hosting_reward_service import HostingRewardService 

459 for peer in peers: 

460 if peer.status in ('active', 'stale') and peer.node_id != self.node_id: 

461 HostingRewardService.compute_contribution_score(db, peer.node_id) 

462 db.commit() 

463 except Exception: 

464 pass 

465 except Exception as e: 

466 db.rollback() 

467 logger.debug(f"Health check error: {e}") 

468 finally: 

469 db.close() 

470 

471 # ─── Announce ─── 

472 

473 def _announce_to_all(self): 

474 peers = self._load_peers_from_db(exclude_dead=False) 

475 urls = set(p['url'] for p in peers) 

476 # Always include seed peers — flat-mode installs MUST be allowed 

477 # to talk to central.hevolve.ai for the universal peer-join + 

478 # central report-in spec (memory: 

479 # project_universal_peer_join_central_report.md). The previous 

480 # 2026-04 fix dropped seeds entirely in flat mode to suppress 

481 # NameResolutionError noise, which silently broke central 

482 # registration for every desktop install. PeerBackoff (line ~429) 

483 # already absorbs DNS failures with exponential backoff, so we 

484 # get the announce attempt without the log spam. 

485 urls.update(self.seed_peers) 

486 for url in urls: 

487 if not self._running: 

488 return 

489 if url != self.base_url: 

490 # Skip peers that are currently backed off (exponential 

491 # backoff from PeerBackoff). This prevents unnecessary 

492 # DNS lookups for hosts that have been unreachable. 

493 if self._is_peer_backed_off(url): 

494 continue 

495 self._announce_to_peer(url) 

496 self._heartbeat() 

497 

498 def _announce_to_peer(self, peer_url): 

499 try: 

500 resp = pooled_post( 

501 f"{peer_url}/api/social/peers/announce", 

502 json=self._self_info(), 

503 timeout=5, 

504 ) 

505 if resp.status_code == 200: 

506 self._record_peer_success(peer_url) 

507 return True 

508 self._record_peer_failure(peer_url) 

509 return False 

510 except requests.RequestException: 

511 self._record_peer_failure(peer_url) 

512 return False 

513 

514 # ─── Exchange ─── 

515 

516 def _exchange_with_peer(self, peer_url): 

517 try: 

518 payload = { 

519 'peers': self._gossip_peer_list(), 

520 'sender': self._gossip_self_info(), 

521 } 

522 

523 # Try PeerLink first (direct WebSocket, no HTTP overhead) 

524 try: 

525 peer_id = self._url_to_node_id(peer_url) 

526 if peer_id: 

527 from core.peer_link.link_manager import get_link_manager 

528 link = get_link_manager().get_link(peer_id) 

529 if link: 

530 result = link.send('gossip', payload, 

531 wait_response=True, timeout=10) 

532 if result is not None: 

533 get_link_manager().record_http_exchange(peer_id) 

534 return result.get('peers', []) 

535 except Exception: 

536 pass # Fall through to HTTP 

537 

538 # HTTP path (original) 

539 if self.payload_mode == 'msgpack': 

540 try: 

541 import msgpack 

542 resp = pooled_post( 

543 f"{peer_url}/api/social/peers/exchange", 

544 data=self._serialize_payload(payload), 

545 headers={'Content-Type': 'application/msgpack'}, 

546 timeout=10, 

547 ) 

548 except ImportError: 

549 resp = pooled_post( 

550 f"{peer_url}/api/social/peers/exchange", 

551 json=payload, timeout=10, 

552 ) 

553 else: 

554 resp = pooled_post( 

555 f"{peer_url}/api/social/peers/exchange", 

556 json=payload, timeout=10, 

557 ) 

558 if resp.status_code == 200: 

559 data = resp.json() 

560 self._record_peer_success(peer_url) 

561 # Record exchange for auto-upgrade to PeerLink 

562 try: 

563 peer_id = self._url_to_node_id(peer_url) 

564 if peer_id: 

565 from core.peer_link.link_manager import get_link_manager 

566 get_link_manager().record_http_exchange(peer_id) 

567 except Exception: 

568 pass 

569 return data.get('peers', []) 

570 self._record_peer_failure(peer_url) 

571 except requests.RequestException: 

572 self._record_peer_failure(peer_url) 

573 return None 

574 

575 def _url_to_node_id(self, peer_url: str) -> str: 

576 """Look up node_id for a peer URL from the DB.""" 

577 try: 

578 from .models import PeerNode, db_session 

579 with db_session() as db: 

580 peer = db.query(PeerNode).filter( 

581 PeerNode.url == peer_url).first() 

582 if peer: 

583 return peer.node_id 

584 except Exception: 

585 pass 

586 return '' 

587 

588 def _ping_peer(self, peer_url): 

589 if self._is_peer_backed_off(peer_url): 

590 return False 

591 try: 

592 resp = pooled_get(f"{peer_url}/api/social/peers/health", timeout=3) 

593 if resp.status_code == 200: 

594 self._record_peer_success(peer_url) 

595 return True 

596 self._record_peer_failure(peer_url) 

597 return False 

598 except requests.RequestException: 

599 self._record_peer_failure(peer_url) 

600 return False 

601 

602 # ─── Handlers (called by Flask endpoints) ─── 

603 

604 def handle_announce(self, peer_data): 

605 """Process an incoming peer announcement. Returns True if peer was new.""" 

606 from .models import get_db 

607 db = get_db() 

608 try: 

609 is_new = self._merge_peer(db, peer_data) 

610 db.commit() 

611 if is_new: 

612 logger.info(f"New peer discovered: {peer_data.get('name', '')} " 

613 f"at {peer_data.get('url', '')}") 

614 return is_new 

615 except Exception as e: 

616 db.rollback() 

617 logger.debug(f"Announce handler error: {e}") 

618 return False 

619 finally: 

620 db.close() 

621 

622 def broadcast(self, message: dict, targets: list = None) -> int: 

623 """Broadcast a message to active peers via gossip. 

624 

625 Used by RALT skill distribution and skill queries. 

626 Posts to /api/social/peers/broadcast on each target node. 

627 

628 Returns number of peers that responded with HTTP 2xx. 

629 

630 Audit fix #8 (April 2026): previously this counted every HTTP 

631 response as "sent" — even 404s when the peer didn't implement 

632 the broadcast endpoint. Callers thought they succeeded when 

633 they hadn't. Now only 2xx counts, and non-2xx is recorded as 

634 a peer failure so the back-off path kicks in. 

635 """ 

636 peers = self._load_peers_from_db(exclude_dead=True) 

637 if targets: 

638 target_set = set(targets) 

639 peers = [p for p in peers if p.get('node_id') in target_set] 

640 

641 sent = 0 

642 for peer in peers: 

643 url = peer.get('url', '') 

644 if not url or peer.get('node_id') == self.node_id: 

645 continue 

646 if self._is_peer_backed_off(url): 

647 continue 

648 try: 

649 resp = pooled_post( 

650 f"{url}/api/social/peers/broadcast", 

651 json=message, 

652 timeout=5, 

653 ) 

654 # Only 2xx counts as a successful delivery. 4xx means the 

655 # peer rejected (rate-limited, missing endpoint, bad 

656 # payload); 5xx means the peer errored. Either way the 

657 # payload did NOT land, so don't tell callers it did. 

658 if 200 <= resp.status_code < 300: 

659 self._record_peer_success(url) 

660 sent += 1 

661 else: 

662 self._record_peer_failure(url) 

663 logger.debug( 

664 f"gossip.broadcast to {url}: HTTP " 

665 f"{resp.status_code} (type={message.get('type')})") 

666 except requests.RequestException: 

667 self._record_peer_failure(url) 

668 return sent 

669 

670 def handle_exchange(self, their_peers): 

671 """Process incoming peer list, return our peer list.""" 

672 if their_peers: 

673 self._merge_peer_list(their_peers) 

674 return self.get_peer_list() 

675 

676 # ─── Peer List ─── 

677 

678 def get_peer_list(self): 

679 """Return all non-dead peers as dicts, including self.""" 

680 peers = self._load_peers_from_db(exclude_dead=True) 

681 # Include self 

682 self_info = self._self_info() 

683 if not any(p.get('node_id') == self.node_id for p in peers): 

684 peers.append(self_info) 

685 return peers 

686 

687 def get_health(self): 

688 """Return this node's health info for the /health endpoint.""" 

689 uptime = (datetime.utcnow() - self.started_at).total_seconds() 

690 peers = self._load_peers_from_db(exclude_dead=True) 

691 return { 

692 'node_id': self.node_id, 

693 'name': self.node_name, 

694 'version': self.version, 

695 'uptime_seconds': int(uptime), 

696 'peer_count': len(peers), 

697 'agent_count': self._get_count('agent'), 

698 'post_count': self._get_count('post'), 

699 'status': 'healthy', 

700 } 

701 

702 # ─── Internal Helpers ─── 

703 

704 def _self_info(self): 

705 info = { 

706 'node_id': self.node_id, 

707 'url': self.base_url, 

708 'name': self.node_name, 

709 'version': self.version, 

710 'agent_count': self._get_count('agent'), 

711 'post_count': self._get_count('post'), 

712 'timestamp': int(time.time()), 

713 'tier': self.tier, 

714 'hart_tag': self._hart_tag, 

715 } 

716 # Add cryptographic identity if available 

717 try: 

718 from security.node_integrity import get_public_key_hex, compute_code_hash, sign_json_payload 

719 info['public_key'] = get_public_key_hex() 

720 info['code_hash'] = compute_code_hash() 

721 # Include release manifest info if available 

722 try: 

723 from security.master_key import load_release_manifest 

724 manifest = load_release_manifest() 

725 if manifest: 

726 info['release_version'] = manifest.get('version', '') 

727 info['release_manifest_signature'] = manifest.get('master_signature', '') 

728 except Exception: 

729 pass 

730 # Include certificate for regional/central nodes 

731 try: 

732 from security.key_delegation import load_node_certificate 

733 cert = load_node_certificate() 

734 if cert: 

735 info['certificate'] = cert 

736 except Exception: 

737 pass 

738 info['signature'] = sign_json_payload(info) 

739 except Exception: 

740 pass 

741 # Include X25519 public key for E2E encryption 

742 try: 

743 from security.channel_encryption import get_x25519_public_hex 

744 info['x25519_public'] = get_x25519_public_hex() 

745 except Exception: 

746 pass 

747 # Include guardrail hash for peer verification 

748 try: 

749 from security.hive_guardrails import get_guardrail_hash 

750 info['guardrail_hash'] = get_guardrail_hash() 

751 except Exception: 

752 pass 

753 # Include HART OS capabilities (contribution tier + enabled features) 

754 try: 

755 from security.system_requirements import get_capabilities 

756 caps = get_capabilities() 

757 if caps: 

758 info['capability_tier'] = caps.tier.value 

759 info['enabled_features'] = caps.enabled_features 

760 info['hardware_summary'] = { 

761 'cpu_cores': caps.hardware.cpu_cores, 

762 'ram_gb': caps.hardware.ram_gb, 

763 'gpu_vram_gb': caps.hardware.gpu_vram_gb, 

764 'disk_free_gb': caps.hardware.disk_free_gb, 

765 } 

766 except Exception: 

767 pass 

768 # Advertise idle compute availability for distributed task execution 

769 try: 

770 from integrations.coding_agent.idle_detection import IdleDetectionService 

771 from integrations.social.models import get_db 

772 db = get_db() 

773 try: 

774 idle_stats = IdleDetectionService.get_idle_stats(db) 

775 info['idle_compute'] = { 

776 'available': idle_stats.get('currently_idle', 0) > 0, 

777 'idle_agents': idle_stats.get('currently_idle', 0), 

778 'opted_in': idle_stats.get('total_opted_in', 0), 

779 } 

780 finally: 

781 db.close() 

782 except Exception: 

783 pass 

784 # Advertise version info for autonomous upgrade discovery 

785 try: 

786 from integrations.agent_engine.upgrade_orchestrator import get_upgrade_orchestrator 

787 orch = get_upgrade_orchestrator() 

788 info['current_version'] = self.version 

789 status = orch.get_status() 

790 if status.get('version') and status.get('stage') == 'completed': 

791 info['available_version'] = status['version'] 

792 except Exception: 

793 pass 

794 return info 

795 

796 def _seed_initial_peers(self): 

797 """Insert seed peers into DB if not already present.""" 

798 from .models import get_db, PeerNode 

799 db = get_db() 

800 try: 

801 for url in self.seed_peers: 

802 existing = db.query(PeerNode).filter(PeerNode.url == url).first() 

803 if not existing: 

804 seed = PeerNode( 

805 node_id=f'seed_{uuid.uuid4().hex[:12]}', 

806 url=url, name='seed', version='', 

807 status='active', 

808 ) 

809 db.add(seed) 

810 # Also ensure self is in DB 

811 self_peer = db.query(PeerNode).filter( 

812 PeerNode.node_id == self.node_id).first() 

813 if not self_peer: 

814 self_peer = PeerNode( 

815 node_id=self.node_id, url=self.base_url, 

816 name=self.node_name, version=self.version, 

817 status='active', 

818 ) 

819 db.add(self_peer) 

820 db.commit() 

821 except Exception as e: 

822 db.rollback() 

823 logger.debug(f"Seed peers init error: {e}") 

824 finally: 

825 db.close() 

826 

827 def _merge_peer_list(self, peer_list): 

828 """Merge a list of peer dicts into the DB.""" 

829 from .models import get_db 

830 db = get_db() 

831 try: 

832 new_count = 0 

833 for p in peer_list: 

834 if p.get('node_id') and p.get('node_id') != self.node_id: 

835 if self._merge_peer(db, p): 

836 new_count += 1 

837 if new_count > 0: 

838 logger.info(f"Gossip: merged {new_count} new peers") 

839 db.commit() 

840 except Exception as e: 

841 db.rollback() 

842 logger.debug(f"Merge peer list error: {e}") 

843 finally: 

844 db.close() 

845 

846 def _merge_peer(self, db, peer_data): 

847 """Upsert a single peer into PeerNode table. Returns True if new. 

848 Verifies Ed25519 signature if present. Rejects banned nodes.""" 

849 from .models import PeerNode 

850 node_id = peer_data.get('node_id') 

851 url = peer_data.get('url', '').rstrip('/') 

852 if not node_id or not url or node_id == self.node_id: 

853 return False 

854 

855 # Sybil protection: max 5 nodes per IP/hostname. 

856 # Loopback addresses are exempt - single-user dev installs 

857 # naturally accumulate many node_ids on localhost (one per 

858 # reboot / data-dir reset / clean-install), and rejecting 

859 # them as Sybil is a false positive that floods WARNING logs 

860 # AND blocks legitimate self-peer registration during testing. 

861 # Real Sybil attacks come from distinct external IPs. 

862 try: 

863 from urllib.parse import urlparse 

864 host = (urlparse(url).hostname or '').lower() 

865 _is_loopback = host in ( 

866 'localhost', '127.0.0.1', '::1', '0.0.0.0', 

867 ) or host.startswith('127.') 

868 if host and not _is_loopback: 

869 from .models import PeerNode 

870 same_host_count = db.query(PeerNode).filter( 

871 PeerNode.url.contains(host), 

872 PeerNode.integrity_status != 'banned', 

873 ).count() 

874 max_per_ip = int(os.environ.get('HEVOLVE_MAX_PEERS_PER_IP', '5')) 

875 if same_host_count >= max_per_ip: 

876 logger.warning(f"Sybil limit: {same_host_count} nodes from {host}, rejecting {node_id[:8]}") 

877 return False 

878 except Exception: 

879 pass # URL parsing failed — proceed with other checks 

880 

881 # Reject banned nodes 

882 existing = db.query(PeerNode).filter(PeerNode.node_id == node_id).first() 

883 if existing and existing.integrity_status == 'banned': 

884 logger.debug(f"Rejecting banned node: {node_id[:8]}") 

885 return False 

886 

887 # Verify signature if present (backward-compatible: unsigned peers accepted as 'unverified') 

888 signature = peer_data.get('signature') 

889 public_key = peer_data.get('public_key') 

890 signature_valid = False 

891 if signature and public_key: 

892 try: 

893 from security.node_integrity import verify_json_signature 

894 # Build payload without signature for verification 

895 payload = {k: v for k, v in peer_data.items() if k != 'signature'} 

896 signature_valid = verify_json_signature(public_key, payload, signature) 

897 if not signature_valid: 

898 logger.warning(f"Invalid signature from node {node_id[:8]} at {url}") 

899 return False 

900 except ImportError: 

901 pass # crypto module not available, accept unsigned 

902 except Exception as e: 

903 logger.warning(f"Unexpected error verifying signature for {node_id[:8]}: {e}") 

904 return False # Reject on unexpected verification errors 

905 

906 integrity_status = 'verified' if signature_valid else 'unverified' 

907 

908 # Enforcement gate: reject unsigned peers in hard mode 

909 if not signature_valid: 

910 try: 

911 from security.master_key import get_enforcement_mode 

912 enforcement = get_enforcement_mode() 

913 if enforcement == 'hard': 

914 logger.warning(f"Rejecting unsigned peer {node_id[:8]} (enforcement=hard)") 

915 return False 

916 elif enforcement == 'soft': 

917 logger.info(f"Unsigned peer {node_id[:8]} accepted (enforcement=soft)") 

918 except ImportError: 

919 pass # No enforcement module = dev mode, accept all 

920 

921 # Guardrail hash verification: reject peers with different guardrail values 

922 peer_guardrail_hash = peer_data.get('guardrail_hash', '') 

923 if peer_guardrail_hash: 

924 try: 

925 from security.hive_guardrails import get_guardrail_hash 

926 local_guardrail_hash = get_guardrail_hash() 

927 if peer_guardrail_hash != local_guardrail_hash: 

928 logger.warning( 

929 f"Rejecting peer {node_id[:8]}: guardrail hash mismatch") 

930 return False 

931 except Exception: 

932 pass 

933 

934 # Code hash verification: check against release hash registry (multi-version) 

935 # then fall back to current manifest 

936 master_key_verified = False 

937 hash_trusted_source = 'untrusted' 

938 peer_code_hash = peer_data.get('code_hash', '') 

939 if peer_code_hash: 

940 # Priority 1: Release hash registry (supports rolling upgrades) 

941 try: 

942 from security.release_hash_registry import get_release_hash_registry 

943 registry = get_release_hash_registry() 

944 if registry.is_known_release_hash(peer_code_hash): 

945 master_key_verified = True 

946 hash_trusted_source = 'registry' 

947 except Exception: 

948 pass 

949 

950 # Priority 2: Current release manifest (fallback if registry unavailable) 

951 if not master_key_verified: 

952 try: 

953 from security.master_key import load_release_manifest 

954 manifest = load_release_manifest() 

955 if manifest: 

956 expected_hash = manifest.get('code_hash', '') 

957 if peer_code_hash == expected_hash: 

958 master_key_verified = True 

959 hash_trusted_source = 'manifest' 

960 except Exception: 

961 pass 

962 

963 # Enforcement: reject unknown hashes in hard mode 

964 if not master_key_verified: 

965 try: 

966 from security.master_key import get_enforcement_mode 

967 enforcement = get_enforcement_mode() 

968 if enforcement == 'hard': 

969 logger.warning( 

970 f"Rejecting peer {node_id[:8]}: unknown code hash " 

971 f"{peer_code_hash[:16]}... (enforcement=hard)") 

972 return False 

973 elif enforcement == 'soft': 

974 logger.warning( 

975 f"Peer {node_id[:8]} unknown code hash " 

976 f"(enforcement=soft, allowing)") 

977 except Exception: 

978 pass 

979 

980 # Certificate verification for peers claiming regional/central tier 

981 peer_tier = peer_data.get('tier', 'flat') 

982 certificate = peer_data.get('certificate') 

983 certificate_verified = False 

984 if peer_tier in ('regional', 'central') and not certificate: 

985 logger.warning(f"Rejecting {node_id[:8]}: {peer_tier} tier requires certificate") 

986 return False 

987 if peer_tier in ('regional', 'central') and certificate: 

988 try: 

989 from security.key_delegation import verify_certificate_chain 

990 from security.master_key import get_enforcement_mode 

991 chain_result = verify_certificate_chain(certificate) 

992 certificate_verified = chain_result['valid'] 

993 enforcement = get_enforcement_mode() 

994 if not certificate_verified: 

995 # Always reject invalid certificates for regional/central tiers 

996 if peer_tier in ('regional', 'central'): 

997 logger.warning(f"Rejecting peer {node_id[:8]}: {peer_tier} tier requires valid certificate") 

998 return False 

999 if enforcement == 'hard': 

1000 logger.warning(f"Rejecting peer {node_id[:8]}: invalid certificate " 

1001 f"for tier={peer_tier} (enforcement=hard)") 

1002 return False 

1003 else: 

1004 logger.warning(f"Peer {node_id[:8]} has invalid certificate " 

1005 f"for tier={peer_tier} (enforcement={enforcement})") 

1006 except Exception as e: 

1007 logger.debug(f"Certificate verification error for {node_id[:8]}: {e}") 

1008 

1009 if existing: 

1010 existing.last_seen = datetime.utcnow() 

1011 existing.url = url 

1012 existing.name = peer_data.get('name', existing.name) 

1013 existing.version = peer_data.get('version', existing.version) 

1014 existing.agent_count = peer_data.get('agent_count', existing.agent_count) 

1015 existing.post_count = peer_data.get('post_count', existing.post_count) 

1016 # Update integrity fields 

1017 if public_key: 

1018 existing.public_key = public_key 

1019 if peer_data.get('code_hash'): 

1020 existing.code_hash = peer_data['code_hash'] 

1021 if peer_data.get('version'): 

1022 existing.code_version = peer_data['version'] 

1023 if signature_valid: 

1024 existing.integrity_status = 'verified' 

1025 existing.master_key_verified = master_key_verified 

1026 if peer_data.get('release_version'): 

1027 existing.release_version = peer_data['release_version'] 

1028 # Update tier/certificate fields 

1029 existing.tier = peer_tier 

1030 if certificate: 

1031 existing.certificate_json = certificate 

1032 existing.certificate_verified = certificate_verified 

1033 # Update capability tier from HART OS equilibrium 

1034 if peer_data.get('capability_tier'): 

1035 existing.capability_tier = peer_data['capability_tier'] 

1036 if peer_data.get('enabled_features'): 

1037 existing.enabled_features_json = peer_data['enabled_features'] 

1038 # Update X25519 public key for E2E encryption 

1039 if peer_data.get('x25519_public'): 

1040 existing.x25519_public = peer_data['x25519_public'] 

1041 if existing.status == 'dead': 

1042 # Only resurrect if announcement is recent (not stale gossip) 

1043 if (datetime.utcnow() - existing.last_seen).total_seconds() < 60: 

1044 existing.status = 'active' 

1045 return False 

1046 

1047 new_peer = PeerNode( 

1048 node_id=node_id, url=url, 

1049 name=peer_data.get('name', ''), 

1050 version=peer_data.get('version', ''), 

1051 status='active', 

1052 agent_count=peer_data.get('agent_count', 0), 

1053 post_count=peer_data.get('post_count', 0), 

1054 metadata_json=peer_data.get('metadata', {}), 

1055 public_key=public_key or '', 

1056 code_hash=peer_data.get('code_hash', ''), 

1057 code_version=peer_data.get('version', ''), 

1058 integrity_status=integrity_status, 

1059 master_key_verified=master_key_verified, 

1060 release_version=peer_data.get('release_version', ''), 

1061 tier=peer_tier, 

1062 certificate_json=certificate, 

1063 certificate_verified=certificate_verified, 

1064 capability_tier=peer_data.get('capability_tier'), 

1065 enabled_features_json=peer_data.get('enabled_features'), 

1066 x25519_public=peer_data.get('x25519_public', ''), 

1067 ) 

1068 db.add(new_peer) 

1069 

1070 # ─── Seamless Mind Merge ─── 

1071 # Valid peer accepted - auto-federate so minds merge without friction. 

1072 # Connection is a breeze; the audit layer handles trust continuously. 

1073 threading.Thread( 

1074 target=self._auto_federate_peer, 

1075 args=(node_id, url), 

1076 daemon=True, 

1077 ).start() 

1078 

1079 return True 

1080 

1081 def _auto_federate_peer(self, peer_node_id: str, peer_url: str): 

1082 """Auto-follow a newly accepted peer for seamless mind merge. 

1083 Valid peers get instant bidirectional content sharing - no manual step.""" 

1084 try: 

1085 from .models import get_db 

1086 from .federation import federation 

1087 db = get_db() 

1088 try: 

1089 # Follow them (we receive their content) 

1090 federation.follow_instance(db, self.node_id, peer_node_id, peer_url) 

1091 db.commit() 

1092 logger.info(f"Mind merge: auto-federated with {peer_node_id[:8]} at {peer_url}") 

1093 except Exception as e: 

1094 db.rollback() 

1095 logger.debug(f"Auto-federation failed for {peer_node_id[:8]}: {e}") 

1096 finally: 

1097 db.close() 

1098 except Exception: 

1099 pass 

1100 

1101 # ─── Integrity Round ─── 

1102 

1103 def _integrity_round(self): 

1104 """Periodic integrity check: continuous audit using ALL active nodes. 

1105 Every node audits every other node it can reach - not just one random peer. 

1106 Valid connections are a breeze; continuous audit is the price of trust.""" 

1107 # Self-check: verify own code integrity before challenging others 

1108 try: 

1109 from security.runtime_monitor import is_code_healthy 

1110 if not is_code_healthy(): 

1111 logger.critical("Integrity round: local code tampered, stopping gossip") 

1112 self.stop() 

1113 return 

1114 except Exception: 

1115 pass 

1116 

1117 # Self-check: verify own guardrail integrity 

1118 try: 

1119 from security.hive_guardrails import verify_guardrail_integrity 

1120 if not verify_guardrail_integrity(): 

1121 logger.critical("Integrity round: guardrail integrity failed, stopping gossip") 

1122 self.stop() 

1123 return 

1124 except Exception: 

1125 pass 

1126 

1127 from .models import get_db, PeerNode 

1128 db = get_db() 

1129 try: 

1130 active_peers = db.query(PeerNode).filter( 

1131 PeerNode.status == 'active', 

1132 PeerNode.node_id != self.node_id, 

1133 PeerNode.integrity_status != 'banned', 

1134 ).all() 

1135 

1136 if active_peers: 

1137 from .integrity_service import IntegrityService 

1138 

1139 # 1. Guardrail audit: re-verify ALL active peers' guardrail hashes. 

1140 # This is the continuous audit - every node checks every other node. 

1141 for peer in active_peers: 

1142 self._audit_peer_guardrails(db, peer) 

1143 self._heartbeat() 

1144 

1145 # 2. Deep challenge: cycle through challenge types across all peers. 

1146 # Each peer gets a different challenge type per round (round-robin). 

1147 challenge_types = ['agent_count_verify', 'code_hash_check', 

1148 'stats_probe', 'guardrail_verify'] 

1149 for i, peer in enumerate(active_peers): 

1150 challenge_type = challenge_types[i % len(challenge_types)] 

1151 try: 

1152 IntegrityService.create_challenge( 

1153 db, self.node_id, peer.node_id, 

1154 peer.url, challenge_type) 

1155 except Exception as e: 

1156 logger.debug(f"Challenge to {peer.node_id[:8]} failed: {e}") 

1157 self._heartbeat() 

1158 try: 

1159 db.commit() 

1160 except Exception: 

1161 db.rollback() 

1162 

1163 # 3. Run full fraud detection on ALL active peers 

1164 for peer in active_peers: 

1165 try: 

1166 IntegrityService.detect_impression_anomaly(db, peer.node_id) 

1167 IntegrityService.detect_score_jump(db, peer.node_id) 

1168 IntegrityService.detect_collusion(db, peer.node_id) 

1169 except Exception as e: 

1170 logger.debug(f"Fraud detection for {peer.node_id[:8]} failed: {e}") 

1171 try: 

1172 db.commit() 

1173 except Exception: 

1174 db.rollback() 

1175 

1176 # 4. Verify audit compute dominance — no node can outcompute its auditors 

1177 try: 

1178 from .integrity_service import IntegrityService 

1179 for peer in active_peers: 

1180 IntegrityService.verify_audit_dominance(db, peer.node_id) 

1181 db.commit() 

1182 except Exception as e: 

1183 db.rollback() 

1184 logger.debug(f"Audit dominance check failed: {e}") 

1185 

1186 # 5. Pull registry ban list if configured 

1187 registry_url = os.environ.get('HEVOLVE_REGISTRY_URL', '') 

1188 if registry_url: 

1189 try: 

1190 from .integrity_service import IntegrityService 

1191 banned_ids = IntegrityService.check_registry_ban_list(registry_url) 

1192 if banned_ids: 

1193 for nid in banned_ids: 

1194 peer = db.query(PeerNode).filter_by(node_id=nid).first() 

1195 if peer and peer.integrity_status != 'banned': 

1196 peer.integrity_status = 'banned' 

1197 logger.info(f"Node {nid[:8]} banned via registry") 

1198 db.commit() 

1199 except Exception as e: 

1200 logger.debug(f"Registry ban list check failed: {e}") 

1201 

1202 except Exception as e: 

1203 logger.debug(f"Integrity round error: {e}") 

1204 finally: 

1205 db.close() 

1206 

1207 def _audit_peer_guardrails(self, db, peer): 

1208 """Re-verify a peer's guardrail hash by directly querying it. 

1209 This is the continuous audit — every node verifies every other node.""" 

1210 try: 

1211 resp = pooled_get( 

1212 f"{peer.url}/api/social/integrity/guardrail-hash", 

1213 timeout=5, 

1214 ) 

1215 if resp.status_code != 200: 

1216 return # Endpoint might not exist on older nodes 

1217 

1218 data = resp.json() 

1219 peer_hash = data.get('guardrail_hash', '') 

1220 if not peer_hash: 

1221 return 

1222 

1223 from security.hive_guardrails import get_guardrail_hash 

1224 local_hash = get_guardrail_hash() 

1225 

1226 if peer_hash != local_hash: 

1227 logger.warning( 

1228 f"Continuous audit: guardrail drift detected on " 

1229 f"{peer.node_id[:8]} — disconnecting") 

1230 from .integrity_service import IntegrityService 

1231 IntegrityService.increase_fraud_score( 

1232 db, peer.node_id, 50.0, 

1233 'Guardrail hash drift detected during continuous audit', 

1234 {'expected': local_hash[:16], 'got': peer_hash[:16]}) 

1235 # Severe: unfollow from federation immediately 

1236 try: 

1237 from .federation import federation 

1238 federation.unfollow_instance(db, self.node_id, peer.node_id) 

1239 except Exception: 

1240 pass 

1241 else: 

1242 # Peer passed — reward good behavior 

1243 from .integrity_service import IntegrityService 

1244 IntegrityService.decrease_fraud_score( 

1245 db, peer.node_id, 1.0, 

1246 'Guardrail audit passed') 

1247 except requests.RequestException: 

1248 pass # Network issue — will catch it next round 

1249 except Exception as e: 

1250 logger.debug(f"Guardrail audit for {peer.node_id[:8]} error: {e}") 

1251 

1252 def _load_peers_by_tier(self): 

1253 """Load gossip targets scoped to this node's tier.""" 

1254 from .models import get_db 

1255 db = get_db() 

1256 try: 

1257 from .hierarchy_service import HierarchyService 

1258 return HierarchyService.get_gossip_targets(db, self.node_id, self.tier) 

1259 except Exception: 

1260 return [] 

1261 finally: 

1262 db.close() 

1263 

1264 def _load_peers_from_db(self, exclude_dead=True): 

1265 from .models import get_db, PeerNode 

1266 db = get_db() 

1267 try: 

1268 q = db.query(PeerNode) 

1269 if exclude_dead: 

1270 q = q.filter(PeerNode.status != 'dead') 

1271 peers = q.all() 

1272 return [p.to_dict() for p in peers] 

1273 except Exception: 

1274 return [] 

1275 finally: 

1276 db.close() 

1277 

1278 def _get_count(self, what): 

1279 try: 

1280 from .models import get_db, User, Post 

1281 db = get_db() 

1282 try: 

1283 if what == 'agent': 

1284 return db.query(User).filter(User.user_type == 'agent').count() 

1285 elif what == 'post': 

1286 return db.query(Post).filter(Post.is_deleted == False).count() 

1287 return 0 

1288 finally: 

1289 db.close() 

1290 except Exception: 

1291 return 0 

1292 

1293 

1294# ═══════════════════════════════════════════════════════════════════════ 

1295# AutoDiscovery — Zero-Config LAN Peer Finding via UDP Broadcast 

1296# ═══════════════════════════════════════════════════════════════════════ 

1297 

1298class AutoDiscovery: 

1299 """LAN-based zero-config peer discovery using UDP broadcast. 

1300 

1301 After boot verification, broadcasts a signed beacon every 30s on UDP port 6780. 

1302 Listens for beacons from other nodes on the same network. 

1303 Discovered peers are fed into GossipProtocol as additional seeds. 

1304 

1305 This is ADDITIVE — works alongside seed peers and registry. 

1306 """ 

1307 

1308 BEACON_MAGIC = b'HEVOLVE_DISCO_V1' 

1309 MAX_PACKET_SIZE = 2048 

1310 

1311 def __init__(self, gossip_protocol: GossipProtocol, 

1312 port: int = None, beacon_interval: int = None): 

1313 self._gossip = gossip_protocol 

1314 from core.port_registry import get_port 

1315 # Legacy env var takes precedence for backward compat 

1316 _legacy = os.environ.get('HEVOLVE_DISCOVERY_PORT') 

1317 self._port = port or (int(_legacy) if _legacy else get_port('discovery')) 

1318 self._beacon_interval = beacon_interval or int( 

1319 os.environ.get('HEVOLVE_DISCOVERY_INTERVAL', '30')) 

1320 self._running = False 

1321 self._send_thread = None 

1322 self._recv_thread = None 

1323 self._lock = threading.Lock() 

1324 self._discovered_nodes: set = set() 

1325 self._sock = None 

1326 # Cached list of broadcast addresses (one per usable IPv4 NIC). 

1327 # Refreshed on start; iterated each beacon send. Replaces the 

1328 # naive `<broadcast>` (255.255.255.255) target which on multi-NIC 

1329 # Windows boxes (Wi-Fi + Hyper-V + VMware + Docker virtual NICs) 

1330 # leaves the box on a single OS-chosen interface — usually a 

1331 # virtual subnet, not the physical LAN where peers actually live. 

1332 self._broadcast_targets: list = [] 

1333 

1334 def start(self) -> None: 

1335 """Start beacon sender and listener threads.""" 

1336 import socket as _socket 

1337 with self._lock: 

1338 if self._running: 

1339 return 

1340 self._running = True 

1341 

1342 try: 

1343 self._sock = _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM) 

1344 self._sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_BROADCAST, 1) 

1345 self._sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1) 

1346 self._sock.bind(('', self._port)) 

1347 self._sock.settimeout(2.0) 

1348 except OSError as e: 

1349 logger.warning(f"AutoDiscovery: cannot bind UDP port {self._port}: {e}") 

1350 self._running = False 

1351 return 

1352 

1353 # Enumerate per-NIC broadcast addresses. Always include the 

1354 # limited-broadcast 255.255.255.255 as a fallback. 

1355 self._broadcast_targets = self._enumerate_broadcast_targets() 

1356 logger.info(f"AutoDiscovery broadcast targets: " 

1357 f"{', '.join(self._broadcast_targets) or '<broadcast>'}") 

1358 

1359 self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True) 

1360 self._recv_thread.start() 

1361 self._send_thread = threading.Thread(target=self._send_loop, daemon=True) 

1362 self._send_thread.start() 

1363 logger.info(f"AutoDiscovery started on UDP port {self._port} " 

1364 f"(interval={self._beacon_interval}s)") 

1365 

1366 # NIC name patterns that indicate a virtual/tunnel adapter we 

1367 # should NOT broadcast onto. Case-insensitive substring match. 

1368 # Catches WSL/Hyper-V vSwitch, VMware/VirtualBox host-only adapters, 

1369 # Bluetooth PAN, Docker bridges, and Windows loopback. 

1370 _VIRTUAL_NIC_HINTS = ( 

1371 'loopback', 'pseudo', 'bluetooth', 'vethernet', 'wsl', 

1372 'hyper-v', 'vmware', 'virtualbox', 'vbox', 'docker', 

1373 'tap-', 'tun', 'npcap', 

1374 ) 

1375 

1376 @staticmethod 

1377 def _derive_broadcast(addr: str, netmask: str) -> str: 

1378 """Compute IPv4 broadcast = addr | ~netmask. Returns '' on parse failure.""" 

1379 try: 

1380 a = [int(x) for x in addr.split('.')] 

1381 m = [int(x) for x in netmask.split('.')] 

1382 if len(a) != 4 or len(m) != 4: 

1383 return '' 

1384 bcast = [(a[i] | (~m[i] & 0xFF)) for i in range(4)] 

1385 return '.'.join(str(b) for b in bcast) 

1386 except Exception: 

1387 return '' 

1388 

1389 def _enumerate_broadcast_targets(self) -> list: 

1390 """Return one broadcast address per usable IPv4 NIC. 

1391 

1392 On Windows, ``sock.sendto((b'…', '<broadcast>', port))`` only 

1393 traverses the OS-chosen default-route interface. On boxes with 

1394 multiple physical/virtual NICs this is roulette — the beacon 

1395 often leaves on a virtual NIC the LAN peers aren't on. 

1396 

1397 Implementation notes: 

1398 - psutil returns ``snic.broadcast = None`` on Windows even for 

1399 NICs with valid IPv4 addresses, so we derive broadcast from 

1400 ``address | ~netmask`` ourselves. 

1401 - We skip virtual / tunnel NICs by name (Bluetooth, vEthernet, 

1402 WSL, Hyper-V, VMware, Docker, loopback) so a beacon never 

1403 leaks into a virtual subnet our LAN peers aren't on. 

1404 - Fallback: if no usable NIC is found, return the limited 

1405 broadcast so degraded environments still emit something. 

1406 """ 

1407 try: 

1408 import psutil 

1409 except ImportError: 

1410 return ['255.255.255.255'] 

1411 targets = [] 

1412 try: 

1413 stats = {} 

1414 try: 

1415 stats = psutil.net_if_stats() 

1416 except Exception: 

1417 pass 

1418 for nic_name, addrs in psutil.net_if_addrs().items(): 

1419 # Skip virtual/tunnel NICs by name pattern. 

1420 lower_name = nic_name.lower() 

1421 if any(hint in lower_name for hint in self._VIRTUAL_NIC_HINTS): 

1422 continue 

1423 # Skip if the NIC is down (when stats are available). 

1424 nic_stat = stats.get(nic_name) 

1425 if nic_stat is not None and not nic_stat.isup: 

1426 continue 

1427 for snic in addrs: 

1428 if getattr(snic, 'family', None) is None: 

1429 continue 

1430 fam_val = int(snic.family) if hasattr(snic.family, 'value') else snic.family 

1431 if fam_val != 2: # AF_INET 

1432 continue 

1433 addr = snic.address or '' 

1434 netmask = snic.netmask or '' 

1435 bcast = snic.broadcast or '' 

1436 # Skip loopback (127.x) and APIPA (169.254.x). 

1437 if addr.startswith('127.') or addr.startswith('169.254.'): 

1438 continue 

1439 # Derive broadcast on Windows (psutil leaves it None). 

1440 if not bcast and netmask: 

1441 bcast = self._derive_broadcast(addr, netmask) 

1442 if not bcast: 

1443 continue 

1444 if bcast in ('0.0.0.0', '255.255.255.255'): 

1445 continue # Treat as "no useful broadcast" 

1446 if bcast not in targets: 

1447 targets.append(bcast) 

1448 except Exception as e: 

1449 logger.debug(f"AutoDiscovery NIC enumeration error: {e}") 

1450 # Always keep limited broadcast as a final fallback so a host 

1451 # without psutil-readable NICs (rare degraded environments) still 

1452 # gets at least one outbound packet. 

1453 if '255.255.255.255' not in targets: 

1454 targets.append('255.255.255.255') 

1455 return targets 

1456 

1457 def stop(self) -> None: 

1458 """Stop discovery threads and close socket.""" 

1459 with self._lock: 

1460 self._running = False 

1461 if self._sock: 

1462 try: 

1463 self._sock.close() 

1464 except Exception: 

1465 pass 

1466 

1467 def _build_beacon(self) -> bytes: 

1468 """Build a signed beacon packet: MAGIC + JSON payload.""" 

1469 import json as _json 

1470 payload = { 

1471 'type': 'hevolve-discovery', 

1472 'node_id': self._gossip.node_id, 

1473 'url': self._gossip.base_url, 

1474 'name': self._gossip.node_name, 

1475 'version': self._gossip.version, 

1476 'tier': self._gossip.tier, 

1477 'timestamp': int(time.time()), 

1478 } 

1479 try: 

1480 from security.hive_guardrails import get_guardrail_hash 

1481 payload['guardrail_hash'] = get_guardrail_hash() 

1482 except Exception: 

1483 pass 

1484 try: 

1485 from security.node_integrity import ( 

1486 get_public_key_hex, compute_code_hash, sign_json_payload, 

1487 ) 

1488 payload['public_key'] = get_public_key_hex() 

1489 payload['code_hash'] = compute_code_hash() 

1490 except Exception: 

1491 pass 

1492 # Include release version if manifest available 

1493 try: 

1494 from security.master_key import load_release_manifest 

1495 manifest = load_release_manifest() 

1496 if manifest: 

1497 payload['release_version'] = manifest.get('version', '') 

1498 except Exception: 

1499 pass 

1500 # Include X25519 public key for E2E encryption 

1501 try: 

1502 from security.channel_encryption import get_x25519_public_hex 

1503 payload['x25519_public'] = get_x25519_public_hex() 

1504 except Exception: 

1505 pass 

1506 # Include robot capabilities for fleet dispatch 

1507 try: 

1508 from integrations.robotics.capability_advertiser import ( 

1509 get_capability_advertiser, 

1510 ) 

1511 adv = get_capability_advertiser() 

1512 payload['robot_capabilities'] = adv.get_gossip_payload() 

1513 except Exception: 

1514 pass 

1515 try: 

1516 from security.node_integrity import sign_json_payload 

1517 payload['signature'] = sign_json_payload(payload) 

1518 except Exception: 

1519 pass 

1520 

1521 json_bytes = _json.dumps(payload, separators=(',', ':')).encode('utf-8') 

1522 return self.BEACON_MAGIC + json_bytes 

1523 

1524 def _parse_beacon(self, data: bytes) -> dict: 

1525 """Parse and verify a beacon packet. Returns payload dict or empty dict.""" 

1526 import json as _json 

1527 if not data.startswith(self.BEACON_MAGIC): 

1528 return {} 

1529 try: 

1530 json_bytes = data[len(self.BEACON_MAGIC):] 

1531 payload = _json.loads(json_bytes.decode('utf-8')) 

1532 except (ValueError, UnicodeDecodeError): 

1533 return {} 

1534 

1535 if payload.get('type') != 'hevolve-discovery': 

1536 return {} 

1537 if payload.get('node_id') == self._gossip.node_id: 

1538 return {} 

1539 

1540 # Verify guardrail hash 

1541 peer_hash = payload.get('guardrail_hash', '') 

1542 if peer_hash: 

1543 try: 

1544 from security.hive_guardrails import get_guardrail_hash 

1545 if peer_hash != get_guardrail_hash(): 

1546 logger.debug(f"AutoDiscovery: rejecting beacon from " 

1547 f"{payload.get('node_id', '?')[:8]}: guardrail mismatch") 

1548 return {} 

1549 except Exception: 

1550 pass 

1551 

1552 # Verify code hash against release hash registry 

1553 peer_code_hash = payload.get('code_hash', '') 

1554 if peer_code_hash: 

1555 try: 

1556 from security.release_hash_registry import get_release_hash_registry 

1557 from security.master_key import get_enforcement_mode 

1558 registry = get_release_hash_registry() 

1559 if not registry.is_known_release_hash(peer_code_hash): 

1560 enforcement = get_enforcement_mode() 

1561 if enforcement == 'hard': 

1562 logger.warning( 

1563 f"AutoDiscovery: rejecting beacon from " 

1564 f"{payload.get('node_id', '?')[:8]}: " 

1565 f"unknown code hash {peer_code_hash[:16]}...") 

1566 return {} 

1567 elif enforcement in ('soft', 'warn'): 

1568 logger.info( 

1569 f"AutoDiscovery: unknown code hash from " 

1570 f"{payload.get('node_id', '?')[:8]} " 

1571 f"(enforcement={enforcement})") 

1572 except Exception: 

1573 pass 

1574 

1575 # Verify Ed25519 signature 

1576 sig = payload.get('signature') 

1577 pubkey = payload.get('public_key') 

1578 if sig and pubkey: 

1579 try: 

1580 from security.node_integrity import verify_json_signature 

1581 clean = {k: v for k, v in payload.items() if k != 'signature'} 

1582 if not verify_json_signature(pubkey, clean, sig): 

1583 logger.warning(f"AutoDiscovery: invalid signature from " 

1584 f"{payload.get('node_id', '?')[:8]}") 

1585 return {} 

1586 except Exception: 

1587 pass 

1588 

1589 # Reject stale beacons (> 5 minutes old) 

1590 ts = payload.get('timestamp', 0) 

1591 if abs(time.time() - ts) > 300: 

1592 return {} 

1593 

1594 return payload 

1595 

1596 def _send_loop(self) -> None: 

1597 """Periodically broadcast beacon on LAN. 

1598 

1599 Sleeps via ``NodeWatchdog.sleep_with_heartbeat`` so an interval 

1600 longer than the watchdog's frozen threshold (30s × 10 = 300s by 

1601 default) can't age the heartbeat out mid-sleep. The default 

1602 beacon interval is well below the threshold, but running with 

1603 HEVOLVE_BEACON_INTERVAL=600 (or similar operator overrides) used 

1604 to trigger the restart cascade documented in the 2026-04-11 

1605 incident. 

1606 """ 

1607 while self._running: 

1608 try: 

1609 beacon = self._build_beacon() 

1610 # Send the beacon to every usable per-NIC broadcast 

1611 # address (Win11 multi-NIC: Wi-Fi + Hyper-V + VMware + 

1612 # Docker virtuals). A failure on one NIC must not 

1613 # abort the round. 

1614 targets = self._broadcast_targets or ['255.255.255.255'] 

1615 for tgt in targets: 

1616 try: 

1617 self._sock.sendto(beacon, (tgt, self._port)) 

1618 except Exception as e: 

1619 logger.debug(f"AutoDiscovery send to {tgt} failed: {e}") 

1620 except Exception as e: 

1621 logger.debug(f"AutoDiscovery send error: {e}") 

1622 try: 

1623 from security.node_watchdog import get_watchdog 

1624 wd = get_watchdog() 

1625 if wd is not None: 

1626 wd.sleep_with_heartbeat( 

1627 'auto_discovery', self._beacon_interval, 

1628 stop_check=lambda: not self._running, 

1629 ) 

1630 continue 

1631 except Exception: 

1632 pass 

1633 # Fallback path if watchdog is unavailable: plain sleep + 

1634 # best-effort heartbeat. Preserves original behavior. 

1635 time.sleep(self._beacon_interval) 

1636 

1637 def _recv_loop(self) -> None: 

1638 """Listen for beacons from other nodes on the network. 

1639 

1640 The socket has a 2s timeout (set in start()) so recvfrom wakes 

1641 regularly and we can heartbeat between calls. The heartbeat is 

1642 now emitted on BOTH timeout and successful receipt — the 

1643 previous code only heartbeated on timeout, so a node that kept 

1644 receiving packets every 2s could still let the heartbeat age 

1645 if the recv path itself blocked past the frozen threshold. 

1646 """ 

1647 import socket as _socket 

1648 

1649 def _wd_heartbeat_safe(): 

1650 try: 

1651 from security.node_watchdog import get_watchdog 

1652 wd = get_watchdog() 

1653 if wd: 

1654 wd.heartbeat('auto_discovery') 

1655 except Exception: 

1656 pass 

1657 

1658 while self._running: 

1659 try: 

1660 data, addr = self._sock.recvfrom(self.MAX_PACKET_SIZE) 

1661 except _socket.timeout: 

1662 _wd_heartbeat_safe() 

1663 continue 

1664 except OSError: 

1665 if not self._running: 

1666 break 

1667 continue 

1668 # Successful receipt — refresh heartbeat before processing 

1669 # the payload, which involves JSON parsing + gossip handoff 

1670 # and could itself block for a noticeable fraction of a second. 

1671 _wd_heartbeat_safe() 

1672 

1673 payload = self._parse_beacon(data) 

1674 if not payload: 

1675 continue 

1676 

1677 node_id = payload.get('node_id') 

1678 if node_id in self._discovered_nodes: 

1679 continue 

1680 

1681 self._discovered_nodes.add(node_id) 

1682 url = payload.get('url', '') 

1683 logger.info(f"AutoDiscovery: found node " 

1684 f"{payload.get('name', node_id[:8])} at {url} via LAN") 

1685 

1686 # Feed into gossip 

1687 try: 

1688 self._gossip.handle_announce(payload) 

1689 except Exception: 

1690 pass 

1691 try: 

1692 self._gossip._announce_to_peer(url) 

1693 except Exception: 

1694 pass 

1695 

1696 

1697# Module-level singletons 

1698gossip = GossipProtocol() 

1699auto_discovery = AutoDiscovery(gossip) 

1700 

1701 

1702def get_peer_discovery() -> GossipProtocol: 

1703 """Return the singleton GossipProtocol instance. 

1704 

1705 Canonical accessor for callers that want the gossip singleton 

1706 without importing the module-level binding directly (e.g. 

1707 `integrations.agent_engine.compute_borrowing` for compute-offer 

1708 broadcast). Same object every call — gossip identity is stable. 

1709 """ 

1710 return gossip 

1711 

1712 

1713def get_auto_discovery() -> AutoDiscovery: 

1714 """Return the singleton AutoDiscovery instance. 

1715 

1716 Canonical accessor used by standalone runners (systemd unit, 

1717 NixOS module) so they don't have to instantiate a new 

1718 AutoDiscovery with their own gossip — they pick up the same 

1719 singleton wired here. 

1720 """ 

1721 return auto_discovery