Coverage for core / peer_link / nat.py: 73.3%

135 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2NAT Traversal — get two peers connected regardless of network topology. 

3 

4Strategy (try in order, stop at first success): 

5 1. LAN direct: Same subnet -> direct WebSocket to peer IP 

6 2. STUN: Get external IP via STUN -> try direct connection 

7 3. WireGuard: Use compute_mesh WireGuard tunnel -> WS over mesh IP 

8 4. Peer relay: Route through a mutual peer with public IP 

9 5. Crossbar relay: Last resort (legacy compatibility) 

10 

11Reuses existing infrastructure: 

12 - compute_mesh_service.py -> STUN server config, WireGuard tunnel 

13 - peer_discovery.py -> peer list with addresses 

14 - signaling.py -> connection negotiation pattern 

15 

16For same-user LAN: strategy 1 always works (UDP beacon discovery). 

17For cross-user WAN: strategies 2-5 depending on NAT type. 

18""" 

19import logging 

20import os 

21import socket 

22import struct 

23import threading 

24from typing import Optional 

25 

26logger = logging.getLogger('hevolve.peer_link') 

27 

28 

29class NATType: 

30 """Detected NAT type.""" 

31 NONE = 'none' # Public IP, no NAT 

32 FULL_CONE = 'full_cone' # Any external host can reach mapped port 

33 RESTRICTED = 'restricted' # Only hosts we've sent to can reach us 

34 SYMMETRIC = 'symmetric' # Different mapping per destination (hardest) 

35 UNKNOWN = 'unknown' 

36 

37 

38class NATTraversal: 

39 """Orchestrate NAT traversal to establish peer connection. 

40 

41 Returns a WebSocket URL that can be used to connect to the peer, 

42 or None if all strategies fail. 

43 """ 

44 

45 def __init__(self, stun_server: str = ''): 

46 self._stun_server = stun_server or os.environ.get( 

47 'HEVOLVE_STUN_SERVER', 'stun.l.google.com:19302') 

48 self._external_ip: Optional[str] = None 

49 self._nat_type = NATType.UNKNOWN 

50 self._lock = threading.Lock() 

51 

52 def resolve_peer_address(self, peer_info: dict) -> Optional[str]: 

53 """Try all strategies to get a connectable address for a peer. 

54 

55 Args: 

56 peer_info: Dict with peer's url, mesh_ip, node_id, etc. 

57 

58 Returns: 

59 WebSocket URL (ws://host:port/peer_link) or None 

60 """ 

61 peer_url = peer_info.get('url', '') 

62 peer_mesh_ip = peer_info.get('mesh_ip', '') 

63 

64 # Extract host from URL 

65 peer_host = self._extract_host(peer_url) 

66 

67 # Strategy 1: LAN direct 

68 ws_url = self._try_lan_direct(peer_host) 

69 if ws_url: 

70 logger.debug(f"NAT: LAN direct to {peer_host}") 

71 return ws_url 

72 

73 # Strategy 2: Direct WAN (peer might have public IP) 

74 ws_url = self._try_direct_wan(peer_host) 

75 if ws_url: 

76 logger.debug(f"NAT: Direct WAN to {peer_host}") 

77 return ws_url 

78 

79 # Strategy 3: WireGuard mesh IP 

80 if peer_mesh_ip: 

81 ws_url = self._try_wireguard(peer_mesh_ip) 

82 if ws_url: 

83 logger.debug(f"NAT: WireGuard to {peer_mesh_ip}") 

84 return ws_url 

85 

86 # Strategy 4: Relay through seed peer (not implemented yet - placeholder) 

87 # In future: find a mutual peer with public IP and relay through them 

88 

89 # Strategy 5: Crossbar relay (legacy fallback) 

90 ws_url = self._try_crossbar_relay() 

91 if ws_url: 

92 logger.debug("NAT: Crossbar relay fallback") 

93 return ws_url 

94 

95 logger.debug(f"NAT: All strategies failed for {peer_host}") 

96 return None 

97 

98 def _try_lan_direct(self, peer_host: str) -> Optional[str]: 

99 """Check if peer is on same LAN subnet.""" 

100 if not peer_host: 

101 return None 

102 

103 try: 

104 # Check if peer is reachable on LAN 

105 from core.port_registry import get_port 

106 port = get_port('backend') 

107 

108 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

109 sock.settimeout(2) 

110 result = sock.connect_ex((peer_host, port)) 

111 sock.close() 

112 

113 if result == 0: 

114 return f'ws://{peer_host}:{port}/peer_link' 

115 except Exception: 

116 pass 

117 return None 

118 

119 def _try_direct_wan(self, peer_host: str) -> Optional[str]: 

120 """Try direct connection to peer's public address.""" 

121 if not peer_host or self._is_private_ip(peer_host): 

122 return None 

123 

124 try: 

125 from core.port_registry import get_port 

126 port = get_port('backend') 

127 

128 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

129 sock.settimeout(5) 

130 result = sock.connect_ex((peer_host, port)) 

131 sock.close() 

132 

133 if result == 0: 

134 return f'ws://{peer_host}:{port}/peer_link' 

135 except Exception: 

136 pass 

137 return None 

138 

139 def _try_wireguard(self, mesh_ip: str) -> Optional[str]: 

140 """Try connection through WireGuard mesh tunnel.""" 

141 if not mesh_ip: 

142 return None 

143 

144 try: 

145 # Check if mesh interface exists 

146 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

147 sock.settimeout(3) 

148 # WireGuard mesh uses compute_mesh port 

149 result = sock.connect_ex((mesh_ip, 6796)) 

150 sock.close() 

151 

152 if result == 0: 

153 return f'ws://{mesh_ip}:6796/peer_link' 

154 except Exception: 

155 pass 

156 return None 

157 

158 def _try_crossbar_relay(self) -> Optional[str]: 

159 """Use Crossbar as relay (last resort).""" 

160 crossbar_url = os.environ.get('CBURL', '') 

161 if crossbar_url: 

162 # Return the Crossbar URL — link_manager will use WAMP relay mode 

163 return crossbar_url 

164 return None 

165 

166 def get_external_ip(self) -> Optional[str]: 

167 """Get our external IP via STUN.""" 

168 if self._external_ip: 

169 return self._external_ip 

170 

171 try: 

172 # Simple STUN request 

173 host, port_str = self._stun_server.rsplit(':', 1) 

174 port = int(port_str) 

175 

176 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

177 sock.settimeout(3) 

178 

179 # STUN binding request (simplified) 

180 # STUN message: type=0x0001 (binding request), length=0, 

181 # magic=0x2112A442, transaction_id=random 

182 txn_id = os.urandom(12) 

183 request = struct.pack('>HHI', 0x0001, 0, 0x2112A442) + txn_id 

184 

185 sock.sendto(request, (host, port)) 

186 data, _ = sock.recvfrom(1024) 

187 sock.close() 

188 

189 if len(data) >= 32: 

190 # Parse XOR-MAPPED-ADDRESS from response (simplified) 

191 # Full STUN parsing would need proper attribute parsing 

192 # For now, this is a best-effort extraction 

193 for i in range(20, len(data) - 8): 

194 attr_type = struct.unpack('>H', data[i:i+2])[0] 

195 if attr_type == 0x0020: # XOR-MAPPED-ADDRESS 

196 attr_len = struct.unpack('>H', data[i+2:i+4])[0] 

197 if attr_len >= 8: 

198 xor_port = struct.unpack( 

199 '>H', data[i+6:i+8])[0] ^ 0x2112 

200 xor_ip = struct.unpack( 

201 '>I', data[i+8:i+12])[0] ^ 0x2112A442 

202 ip = socket.inet_ntoa(struct.pack('>I', xor_ip)) 

203 self._external_ip = ip 

204 return ip 

205 break 

206 except Exception as e: 

207 logger.debug(f"STUN lookup failed: {e}") 

208 

209 return None 

210 

211 @staticmethod 

212 def _extract_host(url: str) -> str: 

213 """Extract hostname from URL.""" 

214 if not url: 

215 return '' 

216 # Remove protocol 

217 host = url.split('://')[-1] 

218 # Remove path 

219 host = host.split('/')[0] 

220 # Remove port 

221 host = host.split(':')[0] 

222 return host 

223 

224 @staticmethod 

225 def _is_private_ip(ip: str) -> bool: 

226 """Check if IP is in a private range.""" 

227 try: 

228 parts = [int(p) for p in ip.split('.')] 

229 if len(parts) != 4: 

230 return False 

231 return (parts[0] == 10 or 

232 (parts[0] == 172 and 16 <= parts[1] <= 31) or 

233 (parts[0] == 192 and parts[1] == 168) or 

234 parts[0] == 127) 

235 except (ValueError, IndexError): 

236 return False 

237 

238 

239# Module-level singleton 

240_nat: Optional[NATTraversal] = None 

241_nat_lock = threading.Lock() 

242 

243 

244def get_nat_traversal() -> NATTraversal: 

245 global _nat 

246 if _nat is None: 

247 with _nat_lock: 

248 if _nat is None: 

249 _nat = NATTraversal() 

250 return _nat