Coverage for core / peer_link / link.py: 41.8%

390 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2PeerLink — persistent WebSocket connection to a single peer. 

3 

4Trust-boundary-aware encryption: 

5 SAME_USER: Your own devices — ANY network (LAN, WAN, regional). 

6 No encryption (plain WebSocket). Trust based on authenticated 

7 user identity (same user_id), NOT network proximity. 

8 LAN discovered via UDP beacon; WAN discovered via compute_mesh 

9 or gossip when user_id matches. 

10 PEER: Another user's device. E2E encrypted (AES-256-GCM session key). 

11 They cannot inspect your payload. 

12 RELAY: Traffic passing through intermediate node. E2E encrypted. 

13 Relay sees only opaque bytes. 

14 

15Wire format: 

16 Text frame: {"ch":"gossip","id":"msg123","d":{...}} 

17 Binary frame: [1B channel_id][4B msg_id_hash][payload bytes] 

18 

19 For PEER/RELAY trust: entire frame is AES-256-GCM encrypted before sending. 

20 For SAME_USER: sent as-is (plain). 

21 

22Session lifecycle: 

23 1. WebSocket connect (ws:// for LAN, wss:// for WAN) 

24 2. Handshake: exchange node identity (Ed25519 pub key + X25519 pub key) 

25 3. If PEER/RELAY: ECDH key exchange -> derive session_key 

26 4. Mutual Ed25519 signature verification 

27 5. Exchange capabilities (GPU, models, tier) 

28 6. Ready — multiplexed channels active 

29 

30 Key rotation: new ECDH every 3600 seconds (forward secrecy) 

31""" 

32import hashlib 

33import json 

34import logging 

35import os 

36import struct 

37import threading 

38import time 

39import uuid 

40from enum import Enum 

41from typing import Any, Callable, Dict, List, Optional, Tuple 

42 

43logger = logging.getLogger('hevolve.peer_link') 

44 

45 

46class TrustLevel(Enum): 

47 SAME_USER = 'same_user' # Own devices on ANY network (LAN + WAN + regional) — no encryption 

48 PEER = 'peer' # Other user's device — E2E mandatory 

49 RELAY = 'relay' # Through intermediate — E2E mandatory 

50 

51 def trust_rank(self) -> int: 

52 """Numeric rank for trust comparison (higher = more trusted). 

53 

54 G9: Used by the trust ratchet to prevent downgrade attacks. 

55 """ 

56 return _TRUST_RANKS.get(self, 0) 

57 

58 

59# Trust ordering: RELAY < PEER < SAME_USER 

60_TRUST_RANKS = { 

61 TrustLevel.RELAY: 0, 

62 TrustLevel.PEER: 1, 

63 TrustLevel.SAME_USER: 2, 

64} 

65 

66 

67class LinkState(Enum): 

68 DISCONNECTED = 'disconnected' 

69 CONNECTING = 'connecting' 

70 HANDSHAKING = 'handshaking' 

71 CONNECTED = 'connected' 

72 CLOSING = 'closing' 

73 

74 

75# Channel IDs for binary frames — single source of truth is CHANNEL_REGISTRY 

76# in core.peer_link.channels. Re-exported here for backwards compatibility. 

77from core.peer_link.channels import CHANNEL_IDS, CHANNEL_NAMES # noqa: E402 

78 

79# Key rotation interval (seconds) 

80KEY_ROTATION_INTERVAL = 3600 

81 

82 

83class PeerLink: 

84 """Persistent WebSocket connection to a single peer. 

85 

86 Encryption is determined by TrustLevel: 

87 - SAME_USER: plain (your own devices, trusted) 

88 - PEER/RELAY: AES-256-GCM with session key from X25519 ECDH 

89 """ 

90 

91 def __init__(self, peer_id: str, address: str, trust: TrustLevel, 

92 x25519_public_hex: str = '', ed25519_public_hex: str = '', 

93 capabilities: Optional[dict] = None): 

94 self.peer_id = peer_id 

95 self.address = address # host:port or ws:// URL 

96 self.trust = trust 

97 self.peer_x25519_public = x25519_public_hex 

98 self.peer_ed25519_public = ed25519_public_hex 

99 self.capabilities = capabilities or {} 

100 

101 # G9: Trust ratchet — once trust is established at a level, 

102 # it can only be UPGRADED (never downgraded) during this session. 

103 # Prevents trust downgrade attacks where a MITM claims PEER after 

104 # SAME_USER was already verified. 

105 self._min_trust_level: TrustLevel = trust 

106 

107 self._state = LinkState.DISCONNECTED 

108 self._ws = None 

109 self._session_key: Optional[bytes] = None # AES-256 key (PEER/RELAY only) 

110 self._session_nonce_counter = 0 

111 self._key_established_at = 0.0 

112 self._lock = threading.Lock() 

113 self._message_handlers: Dict[str, List[Callable]] = {} 

114 self._pending_responses: Dict[str, threading.Event] = {} 

115 self._response_data: Dict[str, Any] = {} 

116 self._recv_thread: Optional[threading.Thread] = None 

117 

118 # Stats 

119 self._connected_at = 0.0 

120 self._last_activity = 0.0 

121 self._messages_sent = 0 

122 self._messages_received = 0 

123 self._bytes_sent = 0 

124 self._bytes_received = 0 

125 

126 @property 

127 def state(self) -> LinkState: 

128 return self._state 

129 

130 @property 

131 def is_connected(self) -> bool: 

132 return self._state == LinkState.CONNECTED 

133 

134 @property 

135 def is_encrypted(self) -> bool: 

136 """E2E encryption active (PEER/RELAY trust only).""" 

137 return self._session_key is not None 

138 

139 @property 

140 def min_trust_level(self) -> TrustLevel: 

141 """The minimum trust level established during this session (ratchet floor).""" 

142 return self._min_trust_level 

143 

144 def set_trust(self, new_trust: TrustLevel) -> bool: 

145 """Set trust level with ratchet enforcement (G9). 

146 

147 Trust can only be UPGRADED, never downgraded during a session. 

148 Returns True if trust was set, False if downgrade was rejected. 

149 """ 

150 if new_trust.trust_rank() < self._min_trust_level.trust_rank(): 

151 logger.warning( 

152 f"Trust downgrade REJECTED for {self.peer_id[:8]}: " 

153 f"{new_trust.value} < {self._min_trust_level.value} (ratchet)") 

154 return False 

155 self.trust = new_trust 

156 # Ratchet upward 

157 if new_trust.trust_rank() > self._min_trust_level.trust_rank(): 

158 self._min_trust_level = new_trust 

159 logger.info( 

160 f"Trust UPGRADED for {self.peer_id[:8]}: → {new_trust.value}") 

161 return True 

162 

163 @property 

164 def idle_seconds(self) -> float: 

165 if self._last_activity == 0: 

166 return 0 

167 return time.time() - self._last_activity 

168 

169 def connect(self) -> bool: 

170 """Initiate outgoing connection to peer.""" 

171 if self._state != LinkState.DISCONNECTED: 

172 return self._state == LinkState.CONNECTED 

173 

174 self._state = LinkState.CONNECTING 

175 try: 

176 ws_url = self._resolve_ws_url() 

177 

178 try: 

179 import websockets.sync.client as ws_client 

180 self._ws = ws_client.connect(ws_url, open_timeout=10, 

181 close_timeout=5) 

182 except ImportError: 

183 # Fallback: use websocket-client library 

184 try: 

185 import websocket 

186 self._ws = websocket.WebSocket() 

187 self._ws.connect(ws_url, timeout=10) 

188 except ImportError: 

189 logger.warning("No WebSocket library available (need websockets or websocket-client)") 

190 self._state = LinkState.DISCONNECTED 

191 return False 

192 

193 self._state = LinkState.HANDSHAKING 

194 if not self._perform_handshake(): 

195 self.close() 

196 return False 

197 

198 self._state = LinkState.CONNECTED 

199 self._connected_at = time.time() 

200 self._last_activity = time.time() 

201 

202 # Start receive loop 

203 self._recv_thread = threading.Thread( 

204 target=self._receive_loop, daemon=True, 

205 name=f'peerlink-recv-{self.peer_id[:8]}') 

206 self._recv_thread.start() 

207 

208 logger.info(f"PeerLink connected to {self.peer_id[:8]} " 

209 f"(trust={self.trust.value}, encrypted={self.is_encrypted})") 

210 return True 

211 

212 except Exception as e: 

213 logger.debug(f"PeerLink connect failed to {self.peer_id[:8]}: {e}") 

214 self._state = LinkState.DISCONNECTED 

215 return False 

216 

217 def accept(self, ws, handshake_data: dict) -> bool: 

218 """Accept incoming connection (called by link_manager's WS server).""" 

219 self._ws = ws 

220 self._state = LinkState.HANDSHAKING 

221 

222 try: 

223 if not self._complete_handshake(handshake_data): 

224 self.close() 

225 return False 

226 

227 self._state = LinkState.CONNECTED 

228 self._connected_at = time.time() 

229 self._last_activity = time.time() 

230 

231 self._recv_thread = threading.Thread( 

232 target=self._receive_loop, daemon=True, 

233 name=f'peerlink-recv-{self.peer_id[:8]}') 

234 self._recv_thread.start() 

235 

236 logger.info(f"PeerLink accepted from {self.peer_id[:8]} " 

237 f"(trust={self.trust.value}, encrypted={self.is_encrypted})") 

238 return True 

239 except Exception as e: 

240 logger.debug(f"PeerLink accept failed: {e}") 

241 self._state = LinkState.DISCONNECTED 

242 return False 

243 

244 def send(self, channel: str, data: dict, wait_response: bool = False, 

245 timeout: float = 30.0) -> Optional[dict]: 

246 """Send JSON message on a channel. 

247 

248 Args: 

249 channel: Channel name (gossip, federation, compute, etc.) 

250 data: JSON-serializable dict 

251 wait_response: If True, block until response received 

252 timeout: Max wait time for response 

253 

254 Returns: 

255 Response dict if wait_response=True, else None 

256 """ 

257 if not self.is_connected or self._ws is None: 

258 return None 

259 

260 msg_id = uuid.uuid4().hex[:12] 

261 frame = json.dumps({ 

262 'ch': channel, 

263 'id': msg_id, 

264 'd': data, 

265 }, separators=(',', ':')) 

266 

267 frame_bytes = frame.encode('utf-8') 

268 

269 # Encrypt for PEER/RELAY trust 

270 if self.trust in (TrustLevel.PEER, TrustLevel.RELAY) and self._session_key: 

271 frame_bytes = self._encrypt(frame_bytes) 

272 

273 event = None 

274 if wait_response: 

275 event = threading.Event() 

276 self._pending_responses[msg_id] = event 

277 

278 try: 

279 self._ws_send(frame_bytes) 

280 self._messages_sent += 1 

281 self._bytes_sent += len(frame_bytes) 

282 self._last_activity = time.time() 

283 except Exception as e: 

284 logger.debug(f"PeerLink send failed: {e}") 

285 self._pending_responses.pop(msg_id, None) 

286 self._handle_disconnect() 

287 return None 

288 

289 if event: 

290 event.wait(timeout=timeout) 

291 self._pending_responses.pop(msg_id, None) 

292 return self._response_data.pop(msg_id, None) 

293 

294 return None 

295 

296 def send_binary(self, channel: str, data: bytes) -> bool: 

297 """Send binary data on a channel (sensor frames, etc.).""" 

298 if not self.is_connected or self._ws is None: 

299 return False 

300 

301 ch_id = CHANNEL_IDS.get(channel, 0xFF) 

302 msg_id_bytes = struct.pack('>I', hash(time.time()) & 0xFFFFFFFF) 

303 frame = bytes([ch_id]) + msg_id_bytes + data 

304 

305 if self.trust in (TrustLevel.PEER, TrustLevel.RELAY) and self._session_key: 

306 frame = self._encrypt(frame) 

307 

308 try: 

309 self._ws_send_binary(frame) 

310 self._messages_sent += 1 

311 self._bytes_sent += len(frame) 

312 self._last_activity = time.time() 

313 return True 

314 except Exception: 

315 self._handle_disconnect() 

316 return False 

317 

318 def on_message(self, channel: str, handler: Callable) -> None: 

319 """Register handler for incoming messages on a channel.""" 

320 if channel not in self._message_handlers: 

321 self._message_handlers[channel] = [] 

322 self._message_handlers[channel].append(handler) 

323 

324 def _verify_same_user_proof(self, proof: str, peer_public_key: str) -> bool: 

325 """Verify that the peer is owned by the same user. 

326 

327 Proof = peer signs our user_id with their Ed25519 key, and we 

328 verify that their user_id matches ours. This prevents an attacker 

329 from claiming SAME_USER trust without holding the user's key. 

330 """ 

331 try: 

332 from security.node_integrity import verify_message_signature 

333 # The proof should be a signature of the local user_id 

334 local_user_id = os.environ.get('HEVOLVE_USER_ID', '') 

335 if not local_user_id or not peer_public_key: 

336 return False 

337 return verify_message_signature(peer_public_key, local_user_id, proof) 

338 except (ImportError, Exception) as e: 

339 logger.debug(f"SAME_USER proof verification failed: {e}") 

340 return False 

341 

342 def close(self) -> None: 

343 """Close the connection.""" 

344 if self._state == LinkState.CLOSING: 

345 return 

346 

347 # Send bye BEFORE setting state — send() checks is_connected (state==CONNECTED) 

348 try: 

349 if self._ws: 

350 self.send('control', {'type': 'bye'}) 

351 except Exception: 

352 pass 

353 

354 self._state = LinkState.CLOSING 

355 

356 try: 

357 if self._ws: 

358 if hasattr(self._ws, 'close'): 

359 self._ws.close() 

360 except Exception: 

361 pass 

362 

363 self._ws = None 

364 self._session_key = None 

365 self._state = LinkState.DISCONNECTED 

366 logger.debug(f"PeerLink closed: {self.peer_id[:8]}") 

367 

368 def get_stats(self) -> dict: 

369 return { 

370 'peer_id': self.peer_id, 

371 'state': self._state.value, 

372 'trust': self.trust.value, 

373 'encrypted': self.is_encrypted, 

374 'connected_seconds': (time.time() - self._connected_at) if self._connected_at else 0, 

375 'idle_seconds': self.idle_seconds, 

376 'messages_sent': self._messages_sent, 

377 'messages_received': self._messages_received, 

378 'bytes_sent': self._bytes_sent, 

379 'bytes_received': self._bytes_received, 

380 'capabilities': self.capabilities, 

381 } 

382 

383 # --- Internal: Handshake ------------------------------------------- 

384 

385 def _perform_handshake(self) -> bool: 

386 """Outgoing handshake: send our identity, receive theirs.""" 

387 try: 

388 from security.node_integrity import get_public_key_hex, sign_json_payload 

389 from security.channel_encryption import get_x25519_public_hex 

390 except ImportError: 

391 logger.warning("Security modules not available — handshake failed") 

392 return False 

393 

394 hello = { 

395 'type': 'hello', 

396 'node_id': self.peer_id, # Will be overwritten with OUR id below 

397 'ed25519_public': get_public_key_hex(), 

398 'x25519_public': get_x25519_public_hex(), 

399 'trust_requested': self.trust.value, 

400 'protocol_version': 1, 

401 'timestamp': time.time(), 

402 } 

403 

404 # Get our actual node_id 

405 try: 

406 from security.node_integrity import get_node_identity 

407 identity = get_node_identity() 

408 hello['node_id'] = identity.get('node_id', '') 

409 except Exception: 

410 pass 

411 

412 # Attach pre-trust contract (proves we agreed to hive terms) 

413 try: 

414 from security.pre_trust_contract import get_pre_trust_verifier 

415 from security.node_integrity import get_node_identity 

416 nid = get_node_identity().get('node_id', '') 

417 verifier = get_pre_trust_verifier() 

418 contract = verifier.get_contract(nid) 

419 if contract: 

420 hello['trust_contract'] = contract 

421 except Exception: 

422 pass # Contract optional for SAME_USER trust 

423 

424 hello['signature'] = sign_json_payload(hello) 

425 

426 # Send hello 

427 hello_bytes = json.dumps(hello, separators=(',', ':')).encode('utf-8') 

428 self._ws_send(hello_bytes) 

429 

430 # Receive hello back 

431 resp_bytes = self._ws_recv(timeout=10) 

432 if not resp_bytes: 

433 return False 

434 

435 try: 

436 resp = json.loads(resp_bytes if isinstance(resp_bytes, str) 

437 else resp_bytes.decode('utf-8')) 

438 except (json.JSONDecodeError, UnicodeDecodeError): 

439 return False 

440 

441 if resp.get('type') != 'hello_ack': 

442 return False 

443 

444 # Verify their Ed25519 signature 

445 peer_ed25519 = resp.get('ed25519_public', '') 

446 peer_sig = resp.pop('signature', '') 

447 if peer_ed25519 and peer_sig: 

448 from security.node_integrity import verify_json_signature 

449 if not verify_json_signature(peer_ed25519, resp, peer_sig): 

450 logger.warning(f"Handshake signature verification failed for {self.peer_id[:8]}") 

451 return False 

452 elif os.environ.get('HEVOLVE_ENFORCEMENT_MODE') == 'hard': 

453 # Hard mode: reject unsigned handshakes 

454 logger.warning(f"Unsigned handshake rejected (hard enforcement) for {self.peer_id[:8]}") 

455 return False 

456 

457 # Store peer's keys 

458 self.peer_ed25519_public = peer_ed25519 

459 self.peer_x25519_public = resp.get('x25519_public', '') 

460 self.capabilities = resp.get('capabilities', {}) 

461 

462 # Derive session key for PEER/RELAY trust 

463 if self.trust in (TrustLevel.PEER, TrustLevel.RELAY) and self.peer_x25519_public: 

464 self._derive_session_key() 

465 

466 return True 

467 

468 def _complete_handshake(self, hello_data: dict) -> bool: 

469 """Incoming handshake: we received their hello, send ack.""" 

470 try: 

471 from security.node_integrity import ( 

472 get_public_key_hex, sign_json_payload, verify_json_signature) 

473 from security.channel_encryption import get_x25519_public_hex 

474 except ImportError: 

475 return False 

476 

477 # Verify their signature 

478 peer_sig = hello_data.pop('signature', '') 

479 peer_ed25519 = hello_data.get('ed25519_public', '') 

480 if peer_ed25519 and peer_sig: 

481 if not verify_json_signature(peer_ed25519, hello_data, peer_sig): 

482 logger.warning("Incoming handshake signature verification failed") 

483 return False 

484 elif os.environ.get('HEVOLVE_ENFORCEMENT_MODE') == 'hard': 

485 logger.warning("Unsigned incoming handshake rejected (hard enforcement)") 

486 return False 

487 

488 self.peer_ed25519_public = peer_ed25519 

489 self.peer_x25519_public = hello_data.get('x25519_public', '') 

490 self.capabilities = hello_data.get('capabilities', {}) 

491 

492 # Determine trust LOCALLY — never accept trust_requested from wire. 

493 # SAME_USER requires proof: peer must present a user_id_signature 

494 # signed by the same user key we hold. Without proof → PEER. 

495 # G9: Trust ratchet enforced via set_trust() — cannot downgrade. 

496 requested_trust = hello_data.get('trust_requested', 'peer') 

497 if requested_trust == 'same_user': 

498 # Verify SAME_USER claim cryptographically 

499 user_proof = hello_data.get('user_id_proof', '') 

500 if user_proof and self._verify_same_user_proof(user_proof, peer_ed25519): 

501 if not self.set_trust(TrustLevel.SAME_USER): 

502 logger.warning("Trust ratchet rejected SAME_USER (should not happen — upgrade)") 

503 return False 

504 else: 

505 logger.warning("SAME_USER trust requested but no valid proof — setting PEER") 

506 if not self.set_trust(TrustLevel.PEER): 

507 logger.warning("Trust ratchet rejected PEER downgrade from higher level") 

508 return False 

509 else: 

510 if not self.set_trust(TrustLevel.PEER): 

511 logger.warning("Trust ratchet rejected PEER downgrade from higher level") 

512 return False 

513 

514 # Verify pre-trust contract for PEER connections 

515 # SAME_USER (own devices) are exempt — trust is user identity based 

516 if self.trust != TrustLevel.SAME_USER: 

517 try: 

518 from security.pre_trust_contract import ( 

519 verify_trust_contract, TrustContract, 

520 get_pre_trust_verifier, 

521 ) 

522 contract_data = hello_data.get('trust_contract') 

523 if contract_data: 

524 contract = TrustContract(**{ 

525 k: v for k, v in contract_data.items() 

526 if k in TrustContract.__dataclass_fields__ 

527 }) 

528 ok, msg = verify_trust_contract(contract) 

529 if not ok: 

530 logger.warning( 

531 f"Pre-trust contract rejected for " 

532 f"{self.peer_id[:8]}: {msg}") 

533 return False 

534 # Register verified contract 

535 get_pre_trust_verifier().register_contract(contract) 

536 logger.info( 

537 f"Pre-trust contract verified for {self.peer_id[:8]}") 

538 except ImportError: 

539 pass # Module not available — allow legacy connections 

540 

541 # Send ack 

542 ack = { 

543 'type': 'hello_ack', 

544 'ed25519_public': get_public_key_hex(), 

545 'x25519_public': get_x25519_public_hex(), 

546 'protocol_version': 1, 

547 'capabilities': self._get_local_capabilities(), 

548 'timestamp': time.time(), 

549 } 

550 ack['signature'] = sign_json_payload(ack) 

551 

552 ack_bytes = json.dumps(ack, separators=(',', ':')).encode('utf-8') 

553 self._ws_send(ack_bytes) 

554 

555 # Derive session key for PEER/RELAY trust 

556 if self.trust in (TrustLevel.PEER, TrustLevel.RELAY) and self.peer_x25519_public: 

557 self._derive_session_key() 

558 

559 return True 

560 

561 def _derive_session_key(self): 

562 """Derive AES-256-GCM session key from X25519 ECDH.""" 

563 try: 

564 from security.channel_encryption import get_x25519_keypair 

565 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PublicKey 

566 from cryptography.hazmat.primitives.kdf.hkdf import HKDF 

567 from cryptography.hazmat.primitives import hashes 

568 

569 our_private, _ = get_x25519_keypair() 

570 peer_pub = X25519PublicKey.from_public_bytes( 

571 bytes.fromhex(self.peer_x25519_public)) 

572 shared_secret = our_private.exchange(peer_pub) 

573 

574 self._session_key = HKDF( 

575 algorithm=hashes.SHA256(), 

576 length=32, 

577 salt=b'hart-peerlink-session-v1', 

578 info=b'hart-peerlink-v1', 

579 ).derive(shared_secret) 

580 

581 self._key_established_at = time.time() 

582 self._session_nonce_counter = 0 

583 logger.debug(f"Session key derived for {self.peer_id[:8]}") 

584 except Exception as e: 

585 logger.warning(f"Session key derivation failed: {e}") 

586 self._session_key = None 

587 

588 # --- Internal: Encryption ------------------------------------------ 

589 

590 def _encrypt(self, plaintext: bytes) -> bytes: 

591 """Encrypt with session key (AES-256-GCM). Prepends nonce.""" 

592 if not self._session_key: 

593 return plaintext 

594 

595 from cryptography.hazmat.primitives.ciphers.aead import AESGCM 

596 

597 # Check key rotation 

598 if (time.time() - self._key_established_at) > KEY_ROTATION_INTERVAL: 

599 self._derive_session_key() 

600 

601 nonce = os.urandom(12) 

602 ct = AESGCM(self._session_key).encrypt(nonce, plaintext, None) 

603 return nonce + ct # 12 bytes nonce + ciphertext 

604 

605 def _decrypt(self, data: bytes) -> Optional[bytes]: 

606 """Decrypt with session key. Expects nonce prefix.""" 

607 if not self._session_key: 

608 return data 

609 if len(data) < 13: # 12 nonce + at least 1 byte 

610 return None 

611 

612 from cryptography.hazmat.primitives.ciphers.aead import AESGCM 

613 

614 nonce = data[:12] 

615 ct = data[12:] 

616 try: 

617 return AESGCM(self._session_key).decrypt(nonce, ct, None) 

618 except Exception as e: 

619 logger.debug(f"Decrypt failed: {e}") 

620 return None 

621 

622 # --- Internal: WebSocket I/O --------------------------------------- 

623 

624 def _resolve_ws_url(self) -> str: 

625 """Resolve address to WebSocket URL.""" 

626 addr = self.address 

627 if addr.startswith('ws://') or addr.startswith('wss://'): 

628 return addr 

629 # Default: plain WS for LAN, secure for WAN 

630 if self.trust == TrustLevel.SAME_USER: 

631 return f'ws://{addr}/peer_link' 

632 return f'ws://{addr}/peer_link' # TLS handled at transport level if needed 

633 

634 def _ws_send(self, data: bytes) -> None: 

635 """Send bytes over WebSocket (handles different libraries).""" 

636 if self._ws is None: 

637 raise ConnectionError("WebSocket not connected") 

638 if hasattr(self._ws, 'send'): 

639 self._ws.send(data) 

640 

641 def _ws_send_binary(self, data: bytes) -> None: 

642 """Send binary data over WebSocket.""" 

643 if self._ws is None: 

644 raise ConnectionError("WebSocket not connected") 

645 if hasattr(self._ws, 'send'): 

646 # websockets library 

647 self._ws.send(data) 

648 

649 def _ws_recv(self, timeout: float = 30.0) -> Optional[bytes]: 

650 """Receive from WebSocket with timeout.""" 

651 if self._ws is None: 

652 return None 

653 try: 

654 if hasattr(self._ws, 'recv'): 

655 # websockets sync client has recv(timeout) 

656 try: 

657 return self._ws.recv(timeout=timeout) 

658 except TypeError: 

659 # websocket-client doesn't have timeout param on recv 

660 self._ws.settimeout(timeout) 

661 return self._ws.recv() 

662 except Exception: 

663 return None 

664 

665 def _receive_loop(self): 

666 """Background thread: receive and dispatch messages.""" 

667 while self._state == LinkState.CONNECTED and self._ws is not None: 

668 try: 

669 raw = self._ws_recv(timeout=60) 

670 if raw is None: 

671 continue 

672 

673 if isinstance(raw, str): 

674 raw = raw.encode('utf-8') 

675 

676 # Decrypt if needed 

677 if self.trust in (TrustLevel.PEER, TrustLevel.RELAY) and self._session_key: 

678 decrypted = self._decrypt(raw) 

679 if decrypted is None: 

680 continue 

681 raw = decrypted 

682 

683 self._messages_received += 1 

684 self._bytes_received += len(raw) 

685 self._last_activity = time.time() 

686 

687 # Try JSON (text message) 

688 try: 

689 msg = json.loads(raw.decode('utf-8') if isinstance(raw, bytes) else raw) 

690 channel = msg.get('ch', 'control') 

691 msg_id = msg.get('id', '') 

692 data = msg.get('d', {}) 

693 

694 # Check if this is a response to a pending request 

695 if msg.get('re') and msg['re'] in self._pending_responses: 

696 self._response_data[msg['re']] = data 

697 self._pending_responses[msg['re']].set() 

698 continue 

699 

700 # Dispatch to handlers 

701 handlers = self._message_handlers.get(channel, []) 

702 for handler in handlers: 

703 try: 

704 handler(channel, data, self.peer_id) 

705 except Exception as e: 

706 logger.debug(f"Handler error on {channel}: {e}") 

707 continue 

708 except (json.JSONDecodeError, UnicodeDecodeError): 

709 pass 

710 

711 # Binary message 

712 if len(raw) >= 5: 

713 ch_id = raw[0] 

714 channel = CHANNEL_NAMES.get(ch_id, 'unknown') 

715 payload = raw[5:] # skip channel_id + msg_id_hash 

716 handlers = self._message_handlers.get(channel, []) 

717 for handler in handlers: 

718 try: 

719 handler(channel, payload, self.peer_id) 

720 except Exception as e: 

721 logger.debug(f"Binary handler error on {channel}: {e}") 

722 

723 except Exception as e: 

724 if self._state == LinkState.CONNECTED: 

725 logger.debug(f"Receive loop error: {e}") 

726 self._handle_disconnect() 

727 break 

728 

729 def _handle_disconnect(self): 

730 """Handle unexpected disconnection.""" 

731 if self._state != LinkState.CONNECTED: 

732 return 

733 self._state = LinkState.DISCONNECTED 

734 self._ws = None 

735 self._session_key = None 

736 logger.info(f"PeerLink disconnected: {self.peer_id[:8]}") 

737 

738 @staticmethod 

739 def _get_local_capabilities() -> dict: 

740 """Get local node capabilities for handshake.""" 

741 caps = {'cpu_count': os.cpu_count() or 1} 

742 try: 

743 from integrations.service_tools.vram_manager import detect_gpu 

744 gpu = detect_gpu() 

745 if gpu.get('available'): 

746 caps['gpu'] = gpu.get('device_name', 'GPU') 

747 caps['vram_mb'] = gpu.get('vram_total_mb', 0) 

748 except Exception: 

749 pass 

750 try: 

751 from security.key_delegation import get_node_tier 

752 caps['tier'] = get_node_tier() 

753 except Exception: 

754 caps['tier'] = 'flat' 

755 return caps