Coverage for core / peer_link / nat.py: 73.3%
135 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2NAT Traversal — get two peers connected regardless of network topology.
4Strategy (try in order, stop at first success):
5 1. LAN direct: Same subnet -> direct WebSocket to peer IP
6 2. STUN: Get external IP via STUN -> try direct connection
7 3. WireGuard: Use compute_mesh WireGuard tunnel -> WS over mesh IP
8 4. Peer relay: Route through a mutual peer with public IP
9 5. Crossbar relay: Last resort (legacy compatibility)
11Reuses existing infrastructure:
12 - compute_mesh_service.py -> STUN server config, WireGuard tunnel
13 - peer_discovery.py -> peer list with addresses
14 - signaling.py -> connection negotiation pattern
16For same-user LAN: strategy 1 always works (UDP beacon discovery).
17For cross-user WAN: strategies 2-5 depending on NAT type.
18"""
19import logging
20import os
21import socket
22import struct
23import threading
24from typing import Optional
26logger = logging.getLogger('hevolve.peer_link')
29class NATType:
30 """Detected NAT type."""
31 NONE = 'none' # Public IP, no NAT
32 FULL_CONE = 'full_cone' # Any external host can reach mapped port
33 RESTRICTED = 'restricted' # Only hosts we've sent to can reach us
34 SYMMETRIC = 'symmetric' # Different mapping per destination (hardest)
35 UNKNOWN = 'unknown'
38class NATTraversal:
39 """Orchestrate NAT traversal to establish peer connection.
41 Returns a WebSocket URL that can be used to connect to the peer,
42 or None if all strategies fail.
43 """
45 def __init__(self, stun_server: str = ''):
46 self._stun_server = stun_server or os.environ.get(
47 'HEVOLVE_STUN_SERVER', 'stun.l.google.com:19302')
48 self._external_ip: Optional[str] = None
49 self._nat_type = NATType.UNKNOWN
50 self._lock = threading.Lock()
52 def resolve_peer_address(self, peer_info: dict) -> Optional[str]:
53 """Try all strategies to get a connectable address for a peer.
55 Args:
56 peer_info: Dict with peer's url, mesh_ip, node_id, etc.
58 Returns:
59 WebSocket URL (ws://host:port/peer_link) or None
60 """
61 peer_url = peer_info.get('url', '')
62 peer_mesh_ip = peer_info.get('mesh_ip', '')
64 # Extract host from URL
65 peer_host = self._extract_host(peer_url)
67 # Strategy 1: LAN direct
68 ws_url = self._try_lan_direct(peer_host)
69 if ws_url:
70 logger.debug(f"NAT: LAN direct to {peer_host}")
71 return ws_url
73 # Strategy 2: Direct WAN (peer might have public IP)
74 ws_url = self._try_direct_wan(peer_host)
75 if ws_url:
76 logger.debug(f"NAT: Direct WAN to {peer_host}")
77 return ws_url
79 # Strategy 3: WireGuard mesh IP
80 if peer_mesh_ip:
81 ws_url = self._try_wireguard(peer_mesh_ip)
82 if ws_url:
83 logger.debug(f"NAT: WireGuard to {peer_mesh_ip}")
84 return ws_url
86 # Strategy 4: Relay through seed peer (not implemented yet - placeholder)
87 # In future: find a mutual peer with public IP and relay through them
89 # Strategy 5: Crossbar relay (legacy fallback)
90 ws_url = self._try_crossbar_relay()
91 if ws_url:
92 logger.debug("NAT: Crossbar relay fallback")
93 return ws_url
95 logger.debug(f"NAT: All strategies failed for {peer_host}")
96 return None
98 def _try_lan_direct(self, peer_host: str) -> Optional[str]:
99 """Check if peer is on same LAN subnet."""
100 if not peer_host:
101 return None
103 try:
104 # Check if peer is reachable on LAN
105 from core.port_registry import get_port
106 port = get_port('backend')
108 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
109 sock.settimeout(2)
110 result = sock.connect_ex((peer_host, port))
111 sock.close()
113 if result == 0:
114 return f'ws://{peer_host}:{port}/peer_link'
115 except Exception:
116 pass
117 return None
119 def _try_direct_wan(self, peer_host: str) -> Optional[str]:
120 """Try direct connection to peer's public address."""
121 if not peer_host or self._is_private_ip(peer_host):
122 return None
124 try:
125 from core.port_registry import get_port
126 port = get_port('backend')
128 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
129 sock.settimeout(5)
130 result = sock.connect_ex((peer_host, port))
131 sock.close()
133 if result == 0:
134 return f'ws://{peer_host}:{port}/peer_link'
135 except Exception:
136 pass
137 return None
139 def _try_wireguard(self, mesh_ip: str) -> Optional[str]:
140 """Try connection through WireGuard mesh tunnel."""
141 if not mesh_ip:
142 return None
144 try:
145 # Check if mesh interface exists
146 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
147 sock.settimeout(3)
148 # WireGuard mesh uses compute_mesh port
149 result = sock.connect_ex((mesh_ip, 6796))
150 sock.close()
152 if result == 0:
153 return f'ws://{mesh_ip}:6796/peer_link'
154 except Exception:
155 pass
156 return None
158 def _try_crossbar_relay(self) -> Optional[str]:
159 """Use Crossbar as relay (last resort)."""
160 crossbar_url = os.environ.get('CBURL', '')
161 if crossbar_url:
162 # Return the Crossbar URL — link_manager will use WAMP relay mode
163 return crossbar_url
164 return None
166 def get_external_ip(self) -> Optional[str]:
167 """Get our external IP via STUN."""
168 if self._external_ip:
169 return self._external_ip
171 try:
172 # Simple STUN request
173 host, port_str = self._stun_server.rsplit(':', 1)
174 port = int(port_str)
176 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
177 sock.settimeout(3)
179 # STUN binding request (simplified)
180 # STUN message: type=0x0001 (binding request), length=0,
181 # magic=0x2112A442, transaction_id=random
182 txn_id = os.urandom(12)
183 request = struct.pack('>HHI', 0x0001, 0, 0x2112A442) + txn_id
185 sock.sendto(request, (host, port))
186 data, _ = sock.recvfrom(1024)
187 sock.close()
189 if len(data) >= 32:
190 # Parse XOR-MAPPED-ADDRESS from response (simplified)
191 # Full STUN parsing would need proper attribute parsing
192 # For now, this is a best-effort extraction
193 for i in range(20, len(data) - 8):
194 attr_type = struct.unpack('>H', data[i:i+2])[0]
195 if attr_type == 0x0020: # XOR-MAPPED-ADDRESS
196 attr_len = struct.unpack('>H', data[i+2:i+4])[0]
197 if attr_len >= 8:
198 xor_port = struct.unpack(
199 '>H', data[i+6:i+8])[0] ^ 0x2112
200 xor_ip = struct.unpack(
201 '>I', data[i+8:i+12])[0] ^ 0x2112A442
202 ip = socket.inet_ntoa(struct.pack('>I', xor_ip))
203 self._external_ip = ip
204 return ip
205 break
206 except Exception as e:
207 logger.debug(f"STUN lookup failed: {e}")
209 return None
211 @staticmethod
212 def _extract_host(url: str) -> str:
213 """Extract hostname from URL."""
214 if not url:
215 return ''
216 # Remove protocol
217 host = url.split('://')[-1]
218 # Remove path
219 host = host.split('/')[0]
220 # Remove port
221 host = host.split(':')[0]
222 return host
224 @staticmethod
225 def _is_private_ip(ip: str) -> bool:
226 """Check if IP is in a private range."""
227 try:
228 parts = [int(p) for p in ip.split('.')]
229 if len(parts) != 4:
230 return False
231 return (parts[0] == 10 or
232 (parts[0] == 172 and 16 <= parts[1] <= 31) or
233 (parts[0] == 192 and parts[1] == 168) or
234 parts[0] == 127)
235 except (ValueError, IndexError):
236 return False
239# Module-level singleton
240_nat: Optional[NATTraversal] = None
241_nat_lock = threading.Lock()
244def get_nat_traversal() -> NATTraversal:
245 global _nat
246 if _nat is None:
247 with _nat_lock:
248 if _nat is None:
249 _nat = NATTraversal()
250 return _nat