Coverage for core / peer_link / link_manager.py: 79.0%

233 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2PeerLink Manager — manages all active peer connections. 

3 

4Auto-upgrade policy: 

5 After N successful gossip HTTP exchanges with a peer, offer PeerLink upgrade. 

6 

7Connection budget (tier-based): 

8 flat: max 10 links (bandwidth-conscious home devices) 

9 regional: max 50 links (relay capacity) 

10 central: max 200 links (hub capacity) 

11 

12Idle pruning: close links with <1 message in 5 minutes. 

13Priority: keep links to peers with GPU, loaded models. 

14 

15HTTP fallback: send(peer_id, channel, data) tries PeerLink first, 

16falls back to HTTP POST if no link available. 

17""" 

18import json 

19import logging 

20import os 

21import threading 

22import time 

23from typing import Any, Callable, Dict, List, Optional 

24 

25from .link import PeerLink, TrustLevel, LinkState 

26 

27logger = logging.getLogger('hevolve.peer_link') 

28 

29# Connection budget by node tier 

30_MAX_LINKS = { 

31 'flat': 10, 

32 'regional': 50, 

33 'central': 200, 

34} 

35 

36# Auto-upgrade: after this many successful HTTP exchanges, offer PeerLink 

37_UPGRADE_THRESHOLD = 3 

38 

39# Idle timeout: close links idle for this many seconds 

40_IDLE_TIMEOUT = 300 # 5 minutes 

41 

42# Reconnect backoff 

43_RECONNECT_MIN = 5 

44_RECONNECT_MAX = 120 

45 

46 

47class PeerLinkManager: 

48 """Manages all peer connections. Singleton via get_link_manager().""" 

49 

50 def __init__(self): 

51 self._links: Dict[str, PeerLink] = {} # peer_id -> PeerLink 

52 self._lock = threading.Lock() 

53 self._running = False 

54 self._maintenance_thread: Optional[threading.Thread] = None 

55 self._http_exchange_counts: Dict[str, int] = {} # peer_id -> successful exchanges 

56 self._channel_handlers: Dict[str, List[Callable]] = {} 

57 self._reconnect_backoff: Dict[str, float] = {} # peer_id -> backoff duration (seconds) 

58 self._reconnect_last_attempt: Dict[str, float] = {} # peer_id -> last attempt timestamp 

59 

60 # Determine connection budget from tier 

61 try: 

62 from security.key_delegation import get_node_tier 

63 tier = get_node_tier() 

64 except ImportError: 

65 tier = 'flat' 

66 self._max_links = _MAX_LINKS.get(tier, 10) 

67 self._tier = tier 

68 

69 def start(self): 

70 """Start the link manager background maintenance.""" 

71 if self._running: 

72 return 

73 self._running = True 

74 self._maintenance_thread = threading.Thread( 

75 target=self._maintenance_loop, daemon=True, 

76 name='peerlink-maintenance') 

77 self._maintenance_thread.start() 

78 logger.info(f"PeerLinkManager started (tier={self._tier}, " 

79 f"max_links={self._max_links})") 

80 

81 def stop(self): 

82 """Stop manager and close all links.""" 

83 self._running = False 

84 with self._lock: 

85 for link in list(self._links.values()): 

86 link.close() 

87 self._links.clear() 

88 if self._maintenance_thread: 

89 self._maintenance_thread.join(timeout=10) 

90 

91 # --- Link Access --------------------------------------------------- 

92 

93 def get_link(self, peer_id: str) -> Optional[PeerLink]: 

94 """Get active link to a peer, or None.""" 

95 with self._lock: 

96 link = self._links.get(peer_id) 

97 if link and link.is_connected: 

98 return link 

99 return None 

100 

101 def has_link(self, peer_id: str) -> bool: 

102 """Check if an active link exists to a peer.""" 

103 return self.get_link(peer_id) is not None 

104 

105 # --- Send with Fallback -------------------------------------------- 

106 

107 def send(self, peer_id: str, channel: str, data: dict, 

108 peer_url: str = '', wait_response: bool = False, 

109 timeout: float = 30.0) -> Optional[dict]: 

110 """Send message to peer. PeerLink first, HTTP fallback. 

111 

112 Args: 

113 peer_id: Target peer node_id 

114 channel: Channel name 

115 data: Message payload 

116 peer_url: HTTP URL for fallback (if no PeerLink) 

117 wait_response: Block until response 

118 timeout: Max wait time 

119 

120 Returns: 

121 Response dict if wait_response=True, else None 

122 """ 

123 # Try PeerLink first 

124 link = self.get_link(peer_id) 

125 if link: 

126 result = link.send(channel, data, wait_response=wait_response, 

127 timeout=timeout) 

128 if result is not None or not wait_response: 

129 return result 

130 # PeerLink send failed, fall through to HTTP 

131 

132 # HTTP fallback 

133 if not peer_url: 

134 return None 

135 

136 return self._http_fallback(peer_url, channel, data, timeout) 

137 

138 def broadcast(self, channel: str, data: dict, 

139 trust_filter: Optional[TrustLevel] = None) -> int: 

140 """Broadcast message to all connected peers. 

141 

142 Args: 

143 channel: Channel name 

144 data: Message payload 

145 trust_filter: Only send to links with this trust level 

146 

147 Returns: 

148 Number of peers successfully sent to 

149 """ 

150 sent = 0 

151 with self._lock: 

152 links = list(self._links.values()) 

153 

154 for link in links: 

155 if not link.is_connected: 

156 continue 

157 if trust_filter and link.trust != trust_filter: 

158 continue 

159 try: 

160 link.send(channel, data) 

161 sent += 1 

162 except Exception: 

163 pass 

164 return sent 

165 

166 def collect(self, channel: str, timeout_ms: int = 1000) -> List[dict]: 

167 """Broadcast and collect responses from all peers. 

168 

169 Used by HiveMind for distributed thought fusion. 

170 """ 

171 responses = [] 

172 

173 with self._lock: 

174 links = list(self._links.values()) 

175 

176 # Send query and collect responses 

177 timeout_s = timeout_ms / 1000.0 

178 for link in links: 

179 if not link.is_connected: 

180 continue 

181 try: 

182 result = link.send(channel, {'type': 'query'}, 

183 wait_response=True, timeout=timeout_s) 

184 if result: 

185 responses.append(result) 

186 except Exception: 

187 pass 

188 

189 return responses 

190 

191 # --- Link Management ----------------------------------------------- 

192 

193 def upgrade_peer(self, peer_id: str, address: str, 

194 trust: TrustLevel, 

195 x25519_public: str = '', 

196 ed25519_public: str = '') -> bool: 

197 """Upgrade a peer from HTTP to persistent PeerLink. 

198 

199 Called when auto-upgrade threshold reached or manually. 

200 """ 

201 with self._lock: 

202 # Check budget 

203 active = sum(1 for l in self._links.values() if l.is_connected) 

204 if active >= self._max_links: 

205 # Try to evict least useful link 

206 if not self._evict_weakest_link(): 

207 logger.debug(f"Link budget full ({active}/{self._max_links}), " 

208 f"cannot upgrade {peer_id[:8]}") 

209 return False 

210 

211 # Check if already linked 

212 existing = self._links.get(peer_id) 

213 if existing and existing.is_connected: 

214 return True 

215 

216 # Create and connect 

217 link = PeerLink( 

218 peer_id=peer_id, 

219 address=address, 

220 trust=trust, 

221 x25519_public_hex=x25519_public, 

222 ed25519_public_hex=ed25519_public, 

223 ) 

224 

225 # Register channel handlers 

226 for channel, handlers in self._channel_handlers.items(): 

227 for handler in handlers: 

228 link.on_message(channel, handler) 

229 

230 if link.connect(): 

231 with self._lock: 

232 self._links[peer_id] = link 

233 # On the flat-tier host (Nunba), a new persistent peer means 

234 # the WAMP router is now needed for realtime push to this 

235 # mobile device. Safe no-op when Nunba boot already started 

236 # the router or when wamp_router module isn't present (HARTOS 

237 # core used in regional/central deployments that manage their 

238 # own router elsewhere). 

239 try: 

240 from wamp_router import ensure_wamp_running # type: ignore 

241 ensure_wamp_running(reason=f"peer {peer_id[:8]} upgraded") 

242 except ImportError: 

243 pass 

244 except Exception as e: 

245 logger.debug(f"ensure_wamp_running skipped: {e}") 

246 # Best-effort HiveMind enrolment for MoE consensus. The 

247 # bridge's `register_peer_agent` is a no-op when hevolveai 

248 # isn't loaded (central HTTP-only tier) so this is safe 

249 # everywhere and never blocks the link upgrade. 

250 try: 

251 from integrations.agent_engine.world_model_bridge import ( 

252 get_world_model_bridge, 

253 ) 

254 bridge = get_world_model_bridge() 

255 if bridge and hasattr(bridge, 'register_peer_agent'): 

256 bridge.register_peer_agent(peer_id=peer_id) 

257 except Exception as e: 

258 logger.debug( 

259 f"hive register_peer_agent skipped for " 

260 f"{peer_id[:8] if peer_id else '?'}: {e}" 

261 ) 

262 return True 

263 return False 

264 

265 def close_link(self, peer_id: str): 

266 """Close and remove a link.""" 

267 with self._lock: 

268 link = self._links.pop(peer_id, None) 

269 if link: 

270 link.close() 

271 

272 def register_channel_handler(self, channel: str, handler: Callable): 

273 """Register handler for incoming messages on a channel. 

274 

275 Applied to all current and future links. 

276 """ 

277 if channel not in self._channel_handlers: 

278 self._channel_handlers[channel] = [] 

279 self._channel_handlers[channel].append(handler) 

280 

281 # Apply to existing links 

282 with self._lock: 

283 for link in self._links.values(): 

284 link.on_message(channel, handler) 

285 

286 def record_http_exchange(self, peer_id: str): 

287 """Record a successful HTTP exchange with a peer. 

288 

289 When threshold reached, auto-upgrade to PeerLink. 

290 Called by gossip, federation, etc. after successful HTTP call. 

291 """ 

292 count = self._http_exchange_counts.get(peer_id, 0) + 1 

293 self._http_exchange_counts[peer_id] = count 

294 

295 if count >= _UPGRADE_THRESHOLD: 

296 # Try to auto-upgrade 

297 self._http_exchange_counts[peer_id] = 0 

298 self._try_auto_upgrade(peer_id) 

299 

300 # --- Status -------------------------------------------------------- 

301 

302 def get_status(self) -> dict: 

303 with self._lock: 

304 links = {pid: l.get_stats() for pid, l in self._links.items()} 

305 

306 active = sum(1 for s in links.values() if s['state'] == 'connected') 

307 encrypted = sum(1 for s in links.values() if s.get('encrypted')) 

308 

309 return { 

310 'running': self._running, 

311 'tier': self._tier, 

312 'max_links': self._max_links, 

313 'active_links': active, 

314 'encrypted_links': encrypted, 

315 'total_links': len(links), 

316 'links': links, 

317 } 

318 

319 # --- Internal ------------------------------------------------------ 

320 

321 def _maintenance_loop(self): 

322 """Background: prune idle links, attempt reconnects, key rotation.""" 

323 while self._running: 

324 try: 

325 self._prune_idle_links() 

326 self._attempt_reconnects() 

327 except Exception as e: 

328 logger.debug(f"Maintenance error: {e}") 

329 

330 # Sleep in small increments to allow clean shutdown 

331 for _ in range(30): # 30 seconds 

332 if not self._running: 

333 break 

334 time.sleep(1) 

335 

336 def _prune_idle_links(self): 

337 """Close links that haven't had activity in _IDLE_TIMEOUT seconds.""" 

338 to_close = [] 

339 with self._lock: 

340 for peer_id, link in self._links.items(): 

341 if link.is_connected and link.idle_seconds > _IDLE_TIMEOUT: 

342 to_close.append(peer_id) 

343 

344 for peer_id in to_close: 

345 logger.info(f"Pruning idle link: {peer_id[:8]}") 

346 self.close_link(peer_id) 

347 

348 def _attempt_reconnects(self): 

349 """Try to reconnect links that dropped.""" 

350 now = time.time() 

351 with self._lock: 

352 disconnected = [ 

353 (pid, link) for pid, link in self._links.items() 

354 if link.state == LinkState.DISCONNECTED 

355 ] 

356 

357 for peer_id, link in disconnected: 

358 backoff = self._reconnect_backoff.get(peer_id, _RECONNECT_MIN) 

359 last_attempt = self._reconnect_last_attempt.get(peer_id, 0) 

360 if now < last_attempt + backoff: 

361 continue 

362 

363 self._reconnect_last_attempt[peer_id] = now 

364 if link.connect(): 

365 self._reconnect_backoff.pop(peer_id, None) 

366 self._reconnect_last_attempt.pop(peer_id, None) 

367 else: 

368 # Exponential backoff (duration doubles, capped at max) 

369 self._reconnect_backoff[peer_id] = min(backoff * 2, _RECONNECT_MAX) 

370 

371 def _evict_weakest_link(self) -> bool: 

372 """Evict the least useful connected link to make room.""" 

373 with self._lock: 

374 candidates = [ 

375 (pid, link) for pid, link in self._links.items() 

376 if link.is_connected 

377 ] 

378 

379 if not candidates: 

380 return False 

381 

382 # Score: lower = less useful 

383 def score(item): 

384 pid, link = item 

385 s = 0 

386 if link.capabilities.get('gpu'): 

387 s += 10 # GPU peers are valuable 

388 s -= link.idle_seconds / 60 # Penalize idle 

389 s += link._messages_received / 100 # Active peers are valuable 

390 return s 

391 

392 candidates.sort(key=score) 

393 weakest_id = candidates[0][0] 

394 self.close_link(weakest_id) 

395 return True 

396 

397 def _try_auto_upgrade(self, peer_id: str): 

398 """Auto-upgrade a peer from HTTP to PeerLink.""" 

399 # Look up peer info from gossip 

400 try: 

401 from integrations.social.peer_discovery import gossip 

402 peers = gossip.get_peer_list() 

403 peer_info = next((p for p in peers if p.get('node_id') == peer_id), None) 

404 if not peer_info: 

405 return 

406 

407 address = peer_info.get('url', '').replace('http://', '').replace('https://', '').rstrip('/') 

408 if not address: 

409 return 

410 

411 # Determine trust level based on user identity, not network 

412 # Same user = same authenticated user_id across ANY network 

413 trust = TrustLevel.PEER # Default to encrypted 

414 try: 

415 # Check compute_mesh (same-user device registry) 

416 from integrations.agent_engine.compute_mesh_service import get_compute_mesh 

417 mesh = get_compute_mesh() 

418 if mesh and peer_id in (mesh._peers or {}): 

419 trust = TrustLevel.SAME_USER 

420 except Exception: 

421 pass 

422 

423 if trust != TrustLevel.SAME_USER: 

424 try: 

425 # Check if peer's user_id matches ours (regional/WAN same-user) 

426 peer_user_id = peer_info.get('user_id', '') 

427 if peer_user_id: 

428 local_user_id = os.environ.get('HEVOLVE_USER_ID', '') 

429 if not local_user_id: 

430 from security.node_integrity import get_node_identity 

431 local_user_id = get_node_identity().get('user_id', '') 

432 if local_user_id and peer_user_id == local_user_id: 

433 trust = TrustLevel.SAME_USER 

434 except Exception: 

435 pass 

436 

437 self.upgrade_peer( 

438 peer_id=peer_id, 

439 address=address, 

440 trust=trust, 

441 x25519_public=peer_info.get('x25519_public', ''), 

442 ed25519_public=peer_info.get('public_key', ''), 

443 ) 

444 except Exception as e: 

445 logger.debug(f"Auto-upgrade failed for {peer_id[:8]}: {e}") 

446 

447 @staticmethod 

448 def _http_fallback(peer_url: str, channel: str, data: dict, 

449 timeout: float = 30.0) -> Optional[dict]: 

450 """Send via HTTP when no PeerLink available.""" 

451 try: 

452 from core.http_pool import pooled_post 

453 resp = pooled_post( 

454 f'{peer_url}/api/peer-link/message', 

455 json={'ch': channel, 'd': data}, 

456 timeout=timeout, 

457 ) 

458 if resp.status_code == 200: 

459 return resp.json() 

460 except Exception: 

461 pass 

462 return None 

463 

464 

465# --- Singleton --------------------------------------------------------- 

466 

467_manager: Optional[PeerLinkManager] = None 

468_manager_lock = threading.Lock() 

469 

470 

471def get_link_manager() -> PeerLinkManager: 

472 """Get or create the singleton PeerLinkManager.""" 

473 global _manager 

474 if _manager is None: 

475 with _manager_lock: 

476 if _manager is None: 

477 _manager = PeerLinkManager() 

478 return _manager 

479 

480 

481def reset_link_manager(): 

482 """Reset singleton (testing only).""" 

483 global _manager 

484 if _manager: 

485 _manager.stop() 

486 _manager = None