Coverage for core / peer_link / link_manager.py: 79.0%
233 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2PeerLink Manager — manages all active peer connections.
4Auto-upgrade policy:
5 After N successful gossip HTTP exchanges with a peer, offer PeerLink upgrade.
7Connection budget (tier-based):
8 flat: max 10 links (bandwidth-conscious home devices)
9 regional: max 50 links (relay capacity)
10 central: max 200 links (hub capacity)
12Idle pruning: close links with <1 message in 5 minutes.
13Priority: keep links to peers with GPU, loaded models.
15HTTP fallback: send(peer_id, channel, data) tries PeerLink first,
16falls back to HTTP POST if no link available.
17"""
18import json
19import logging
20import os
21import threading
22import time
23from typing import Any, Callable, Dict, List, Optional
25from .link import PeerLink, TrustLevel, LinkState
27logger = logging.getLogger('hevolve.peer_link')
29# Connection budget by node tier
30_MAX_LINKS = {
31 'flat': 10,
32 'regional': 50,
33 'central': 200,
34}
36# Auto-upgrade: after this many successful HTTP exchanges, offer PeerLink
37_UPGRADE_THRESHOLD = 3
39# Idle timeout: close links idle for this many seconds
40_IDLE_TIMEOUT = 300 # 5 minutes
42# Reconnect backoff
43_RECONNECT_MIN = 5
44_RECONNECT_MAX = 120
47class PeerLinkManager:
48 """Manages all peer connections. Singleton via get_link_manager()."""
50 def __init__(self):
51 self._links: Dict[str, PeerLink] = {} # peer_id -> PeerLink
52 self._lock = threading.Lock()
53 self._running = False
54 self._maintenance_thread: Optional[threading.Thread] = None
55 self._http_exchange_counts: Dict[str, int] = {} # peer_id -> successful exchanges
56 self._channel_handlers: Dict[str, List[Callable]] = {}
57 self._reconnect_backoff: Dict[str, float] = {} # peer_id -> backoff duration (seconds)
58 self._reconnect_last_attempt: Dict[str, float] = {} # peer_id -> last attempt timestamp
60 # Determine connection budget from tier
61 try:
62 from security.key_delegation import get_node_tier
63 tier = get_node_tier()
64 except ImportError:
65 tier = 'flat'
66 self._max_links = _MAX_LINKS.get(tier, 10)
67 self._tier = tier
69 def start(self):
70 """Start the link manager background maintenance."""
71 if self._running:
72 return
73 self._running = True
74 self._maintenance_thread = threading.Thread(
75 target=self._maintenance_loop, daemon=True,
76 name='peerlink-maintenance')
77 self._maintenance_thread.start()
78 logger.info(f"PeerLinkManager started (tier={self._tier}, "
79 f"max_links={self._max_links})")
81 def stop(self):
82 """Stop manager and close all links."""
83 self._running = False
84 with self._lock:
85 for link in list(self._links.values()):
86 link.close()
87 self._links.clear()
88 if self._maintenance_thread:
89 self._maintenance_thread.join(timeout=10)
91 # --- Link Access ---------------------------------------------------
93 def get_link(self, peer_id: str) -> Optional[PeerLink]:
94 """Get active link to a peer, or None."""
95 with self._lock:
96 link = self._links.get(peer_id)
97 if link and link.is_connected:
98 return link
99 return None
101 def has_link(self, peer_id: str) -> bool:
102 """Check if an active link exists to a peer."""
103 return self.get_link(peer_id) is not None
105 # --- Send with Fallback --------------------------------------------
107 def send(self, peer_id: str, channel: str, data: dict,
108 peer_url: str = '', wait_response: bool = False,
109 timeout: float = 30.0) -> Optional[dict]:
110 """Send message to peer. PeerLink first, HTTP fallback.
112 Args:
113 peer_id: Target peer node_id
114 channel: Channel name
115 data: Message payload
116 peer_url: HTTP URL for fallback (if no PeerLink)
117 wait_response: Block until response
118 timeout: Max wait time
120 Returns:
121 Response dict if wait_response=True, else None
122 """
123 # Try PeerLink first
124 link = self.get_link(peer_id)
125 if link:
126 result = link.send(channel, data, wait_response=wait_response,
127 timeout=timeout)
128 if result is not None or not wait_response:
129 return result
130 # PeerLink send failed, fall through to HTTP
132 # HTTP fallback
133 if not peer_url:
134 return None
136 return self._http_fallback(peer_url, channel, data, timeout)
138 def broadcast(self, channel: str, data: dict,
139 trust_filter: Optional[TrustLevel] = None) -> int:
140 """Broadcast message to all connected peers.
142 Args:
143 channel: Channel name
144 data: Message payload
145 trust_filter: Only send to links with this trust level
147 Returns:
148 Number of peers successfully sent to
149 """
150 sent = 0
151 with self._lock:
152 links = list(self._links.values())
154 for link in links:
155 if not link.is_connected:
156 continue
157 if trust_filter and link.trust != trust_filter:
158 continue
159 try:
160 link.send(channel, data)
161 sent += 1
162 except Exception:
163 pass
164 return sent
166 def collect(self, channel: str, timeout_ms: int = 1000) -> List[dict]:
167 """Broadcast and collect responses from all peers.
169 Used by HiveMind for distributed thought fusion.
170 """
171 responses = []
173 with self._lock:
174 links = list(self._links.values())
176 # Send query and collect responses
177 timeout_s = timeout_ms / 1000.0
178 for link in links:
179 if not link.is_connected:
180 continue
181 try:
182 result = link.send(channel, {'type': 'query'},
183 wait_response=True, timeout=timeout_s)
184 if result:
185 responses.append(result)
186 except Exception:
187 pass
189 return responses
191 # --- Link Management -----------------------------------------------
193 def upgrade_peer(self, peer_id: str, address: str,
194 trust: TrustLevel,
195 x25519_public: str = '',
196 ed25519_public: str = '') -> bool:
197 """Upgrade a peer from HTTP to persistent PeerLink.
199 Called when auto-upgrade threshold reached or manually.
200 """
201 with self._lock:
202 # Check budget
203 active = sum(1 for l in self._links.values() if l.is_connected)
204 if active >= self._max_links:
205 # Try to evict least useful link
206 if not self._evict_weakest_link():
207 logger.debug(f"Link budget full ({active}/{self._max_links}), "
208 f"cannot upgrade {peer_id[:8]}")
209 return False
211 # Check if already linked
212 existing = self._links.get(peer_id)
213 if existing and existing.is_connected:
214 return True
216 # Create and connect
217 link = PeerLink(
218 peer_id=peer_id,
219 address=address,
220 trust=trust,
221 x25519_public_hex=x25519_public,
222 ed25519_public_hex=ed25519_public,
223 )
225 # Register channel handlers
226 for channel, handlers in self._channel_handlers.items():
227 for handler in handlers:
228 link.on_message(channel, handler)
230 if link.connect():
231 with self._lock:
232 self._links[peer_id] = link
233 # On the flat-tier host (Nunba), a new persistent peer means
234 # the WAMP router is now needed for realtime push to this
235 # mobile device. Safe no-op when Nunba boot already started
236 # the router or when wamp_router module isn't present (HARTOS
237 # core used in regional/central deployments that manage their
238 # own router elsewhere).
239 try:
240 from wamp_router import ensure_wamp_running # type: ignore
241 ensure_wamp_running(reason=f"peer {peer_id[:8]} upgraded")
242 except ImportError:
243 pass
244 except Exception as e:
245 logger.debug(f"ensure_wamp_running skipped: {e}")
246 # Best-effort HiveMind enrolment for MoE consensus. The
247 # bridge's `register_peer_agent` is a no-op when hevolveai
248 # isn't loaded (central HTTP-only tier) so this is safe
249 # everywhere and never blocks the link upgrade.
250 try:
251 from integrations.agent_engine.world_model_bridge import (
252 get_world_model_bridge,
253 )
254 bridge = get_world_model_bridge()
255 if bridge and hasattr(bridge, 'register_peer_agent'):
256 bridge.register_peer_agent(peer_id=peer_id)
257 except Exception as e:
258 logger.debug(
259 f"hive register_peer_agent skipped for "
260 f"{peer_id[:8] if peer_id else '?'}: {e}"
261 )
262 return True
263 return False
265 def close_link(self, peer_id: str):
266 """Close and remove a link."""
267 with self._lock:
268 link = self._links.pop(peer_id, None)
269 if link:
270 link.close()
272 def register_channel_handler(self, channel: str, handler: Callable):
273 """Register handler for incoming messages on a channel.
275 Applied to all current and future links.
276 """
277 if channel not in self._channel_handlers:
278 self._channel_handlers[channel] = []
279 self._channel_handlers[channel].append(handler)
281 # Apply to existing links
282 with self._lock:
283 for link in self._links.values():
284 link.on_message(channel, handler)
286 def record_http_exchange(self, peer_id: str):
287 """Record a successful HTTP exchange with a peer.
289 When threshold reached, auto-upgrade to PeerLink.
290 Called by gossip, federation, etc. after successful HTTP call.
291 """
292 count = self._http_exchange_counts.get(peer_id, 0) + 1
293 self._http_exchange_counts[peer_id] = count
295 if count >= _UPGRADE_THRESHOLD:
296 # Try to auto-upgrade
297 self._http_exchange_counts[peer_id] = 0
298 self._try_auto_upgrade(peer_id)
300 # --- Status --------------------------------------------------------
302 def get_status(self) -> dict:
303 with self._lock:
304 links = {pid: l.get_stats() for pid, l in self._links.items()}
306 active = sum(1 for s in links.values() if s['state'] == 'connected')
307 encrypted = sum(1 for s in links.values() if s.get('encrypted'))
309 return {
310 'running': self._running,
311 'tier': self._tier,
312 'max_links': self._max_links,
313 'active_links': active,
314 'encrypted_links': encrypted,
315 'total_links': len(links),
316 'links': links,
317 }
319 # --- Internal ------------------------------------------------------
321 def _maintenance_loop(self):
322 """Background: prune idle links, attempt reconnects, key rotation."""
323 while self._running:
324 try:
325 self._prune_idle_links()
326 self._attempt_reconnects()
327 except Exception as e:
328 logger.debug(f"Maintenance error: {e}")
330 # Sleep in small increments to allow clean shutdown
331 for _ in range(30): # 30 seconds
332 if not self._running:
333 break
334 time.sleep(1)
336 def _prune_idle_links(self):
337 """Close links that haven't had activity in _IDLE_TIMEOUT seconds."""
338 to_close = []
339 with self._lock:
340 for peer_id, link in self._links.items():
341 if link.is_connected and link.idle_seconds > _IDLE_TIMEOUT:
342 to_close.append(peer_id)
344 for peer_id in to_close:
345 logger.info(f"Pruning idle link: {peer_id[:8]}")
346 self.close_link(peer_id)
348 def _attempt_reconnects(self):
349 """Try to reconnect links that dropped."""
350 now = time.time()
351 with self._lock:
352 disconnected = [
353 (pid, link) for pid, link in self._links.items()
354 if link.state == LinkState.DISCONNECTED
355 ]
357 for peer_id, link in disconnected:
358 backoff = self._reconnect_backoff.get(peer_id, _RECONNECT_MIN)
359 last_attempt = self._reconnect_last_attempt.get(peer_id, 0)
360 if now < last_attempt + backoff:
361 continue
363 self._reconnect_last_attempt[peer_id] = now
364 if link.connect():
365 self._reconnect_backoff.pop(peer_id, None)
366 self._reconnect_last_attempt.pop(peer_id, None)
367 else:
368 # Exponential backoff (duration doubles, capped at max)
369 self._reconnect_backoff[peer_id] = min(backoff * 2, _RECONNECT_MAX)
371 def _evict_weakest_link(self) -> bool:
372 """Evict the least useful connected link to make room."""
373 with self._lock:
374 candidates = [
375 (pid, link) for pid, link in self._links.items()
376 if link.is_connected
377 ]
379 if not candidates:
380 return False
382 # Score: lower = less useful
383 def score(item):
384 pid, link = item
385 s = 0
386 if link.capabilities.get('gpu'):
387 s += 10 # GPU peers are valuable
388 s -= link.idle_seconds / 60 # Penalize idle
389 s += link._messages_received / 100 # Active peers are valuable
390 return s
392 candidates.sort(key=score)
393 weakest_id = candidates[0][0]
394 self.close_link(weakest_id)
395 return True
397 def _try_auto_upgrade(self, peer_id: str):
398 """Auto-upgrade a peer from HTTP to PeerLink."""
399 # Look up peer info from gossip
400 try:
401 from integrations.social.peer_discovery import gossip
402 peers = gossip.get_peer_list()
403 peer_info = next((p for p in peers if p.get('node_id') == peer_id), None)
404 if not peer_info:
405 return
407 address = peer_info.get('url', '').replace('http://', '').replace('https://', '').rstrip('/')
408 if not address:
409 return
411 # Determine trust level based on user identity, not network
412 # Same user = same authenticated user_id across ANY network
413 trust = TrustLevel.PEER # Default to encrypted
414 try:
415 # Check compute_mesh (same-user device registry)
416 from integrations.agent_engine.compute_mesh_service import get_compute_mesh
417 mesh = get_compute_mesh()
418 if mesh and peer_id in (mesh._peers or {}):
419 trust = TrustLevel.SAME_USER
420 except Exception:
421 pass
423 if trust != TrustLevel.SAME_USER:
424 try:
425 # Check if peer's user_id matches ours (regional/WAN same-user)
426 peer_user_id = peer_info.get('user_id', '')
427 if peer_user_id:
428 local_user_id = os.environ.get('HEVOLVE_USER_ID', '')
429 if not local_user_id:
430 from security.node_integrity import get_node_identity
431 local_user_id = get_node_identity().get('user_id', '')
432 if local_user_id and peer_user_id == local_user_id:
433 trust = TrustLevel.SAME_USER
434 except Exception:
435 pass
437 self.upgrade_peer(
438 peer_id=peer_id,
439 address=address,
440 trust=trust,
441 x25519_public=peer_info.get('x25519_public', ''),
442 ed25519_public=peer_info.get('public_key', ''),
443 )
444 except Exception as e:
445 logger.debug(f"Auto-upgrade failed for {peer_id[:8]}: {e}")
447 @staticmethod
448 def _http_fallback(peer_url: str, channel: str, data: dict,
449 timeout: float = 30.0) -> Optional[dict]:
450 """Send via HTTP when no PeerLink available."""
451 try:
452 from core.http_pool import pooled_post
453 resp = pooled_post(
454 f'{peer_url}/api/peer-link/message',
455 json={'ch': channel, 'd': data},
456 timeout=timeout,
457 )
458 if resp.status_code == 200:
459 return resp.json()
460 except Exception:
461 pass
462 return None
465# --- Singleton ---------------------------------------------------------
467_manager: Optional[PeerLinkManager] = None
468_manager_lock = threading.Lock()
471def get_link_manager() -> PeerLinkManager:
472 """Get or create the singleton PeerLinkManager."""
473 global _manager
474 if _manager is None:
475 with _manager_lock:
476 if _manager is None:
477 _manager = PeerLinkManager()
478 return _manager
481def reset_link_manager():
482 """Reset singleton (testing only)."""
483 global _manager
484 if _manager:
485 _manager.stop()
486 _manager = None