Coverage for core / peer_link / link.py: 41.8%
390 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2PeerLink — persistent WebSocket connection to a single peer.
4Trust-boundary-aware encryption:
5 SAME_USER: Your own devices — ANY network (LAN, WAN, regional).
6 No encryption (plain WebSocket). Trust based on authenticated
7 user identity (same user_id), NOT network proximity.
8 LAN discovered via UDP beacon; WAN discovered via compute_mesh
9 or gossip when user_id matches.
10 PEER: Another user's device. E2E encrypted (AES-256-GCM session key).
11 They cannot inspect your payload.
12 RELAY: Traffic passing through intermediate node. E2E encrypted.
13 Relay sees only opaque bytes.
15Wire format:
16 Text frame: {"ch":"gossip","id":"msg123","d":{...}}
17 Binary frame: [1B channel_id][4B msg_id_hash][payload bytes]
19 For PEER/RELAY trust: entire frame is AES-256-GCM encrypted before sending.
20 For SAME_USER: sent as-is (plain).
22Session lifecycle:
23 1. WebSocket connect (ws:// for LAN, wss:// for WAN)
24 2. Handshake: exchange node identity (Ed25519 pub key + X25519 pub key)
25 3. If PEER/RELAY: ECDH key exchange -> derive session_key
26 4. Mutual Ed25519 signature verification
27 5. Exchange capabilities (GPU, models, tier)
28 6. Ready — multiplexed channels active
30 Key rotation: new ECDH every 3600 seconds (forward secrecy)
31"""
32import hashlib
33import json
34import logging
35import os
36import struct
37import threading
38import time
39import uuid
40from enum import Enum
41from typing import Any, Callable, Dict, List, Optional, Tuple
43logger = logging.getLogger('hevolve.peer_link')
46class TrustLevel(Enum):
47 SAME_USER = 'same_user' # Own devices on ANY network (LAN + WAN + regional) — no encryption
48 PEER = 'peer' # Other user's device — E2E mandatory
49 RELAY = 'relay' # Through intermediate — E2E mandatory
51 def trust_rank(self) -> int:
52 """Numeric rank for trust comparison (higher = more trusted).
54 G9: Used by the trust ratchet to prevent downgrade attacks.
55 """
56 return _TRUST_RANKS.get(self, 0)
59# Trust ordering: RELAY < PEER < SAME_USER
60_TRUST_RANKS = {
61 TrustLevel.RELAY: 0,
62 TrustLevel.PEER: 1,
63 TrustLevel.SAME_USER: 2,
64}
67class LinkState(Enum):
68 DISCONNECTED = 'disconnected'
69 CONNECTING = 'connecting'
70 HANDSHAKING = 'handshaking'
71 CONNECTED = 'connected'
72 CLOSING = 'closing'
75# Channel IDs for binary frames — single source of truth is CHANNEL_REGISTRY
76# in core.peer_link.channels. Re-exported here for backwards compatibility.
77from core.peer_link.channels import CHANNEL_IDS, CHANNEL_NAMES # noqa: E402
79# Key rotation interval (seconds)
80KEY_ROTATION_INTERVAL = 3600
83class PeerLink:
84 """Persistent WebSocket connection to a single peer.
86 Encryption is determined by TrustLevel:
87 - SAME_USER: plain (your own devices, trusted)
88 - PEER/RELAY: AES-256-GCM with session key from X25519 ECDH
89 """
91 def __init__(self, peer_id: str, address: str, trust: TrustLevel,
92 x25519_public_hex: str = '', ed25519_public_hex: str = '',
93 capabilities: Optional[dict] = None):
94 self.peer_id = peer_id
95 self.address = address # host:port or ws:// URL
96 self.trust = trust
97 self.peer_x25519_public = x25519_public_hex
98 self.peer_ed25519_public = ed25519_public_hex
99 self.capabilities = capabilities or {}
101 # G9: Trust ratchet — once trust is established at a level,
102 # it can only be UPGRADED (never downgraded) during this session.
103 # Prevents trust downgrade attacks where a MITM claims PEER after
104 # SAME_USER was already verified.
105 self._min_trust_level: TrustLevel = trust
107 self._state = LinkState.DISCONNECTED
108 self._ws = None
109 self._session_key: Optional[bytes] = None # AES-256 key (PEER/RELAY only)
110 self._session_nonce_counter = 0
111 self._key_established_at = 0.0
112 self._lock = threading.Lock()
113 self._message_handlers: Dict[str, List[Callable]] = {}
114 self._pending_responses: Dict[str, threading.Event] = {}
115 self._response_data: Dict[str, Any] = {}
116 self._recv_thread: Optional[threading.Thread] = None
118 # Stats
119 self._connected_at = 0.0
120 self._last_activity = 0.0
121 self._messages_sent = 0
122 self._messages_received = 0
123 self._bytes_sent = 0
124 self._bytes_received = 0
126 @property
127 def state(self) -> LinkState:
128 return self._state
130 @property
131 def is_connected(self) -> bool:
132 return self._state == LinkState.CONNECTED
134 @property
135 def is_encrypted(self) -> bool:
136 """E2E encryption active (PEER/RELAY trust only)."""
137 return self._session_key is not None
139 @property
140 def min_trust_level(self) -> TrustLevel:
141 """The minimum trust level established during this session (ratchet floor)."""
142 return self._min_trust_level
144 def set_trust(self, new_trust: TrustLevel) -> bool:
145 """Set trust level with ratchet enforcement (G9).
147 Trust can only be UPGRADED, never downgraded during a session.
148 Returns True if trust was set, False if downgrade was rejected.
149 """
150 if new_trust.trust_rank() < self._min_trust_level.trust_rank():
151 logger.warning(
152 f"Trust downgrade REJECTED for {self.peer_id[:8]}: "
153 f"{new_trust.value} < {self._min_trust_level.value} (ratchet)")
154 return False
155 self.trust = new_trust
156 # Ratchet upward
157 if new_trust.trust_rank() > self._min_trust_level.trust_rank():
158 self._min_trust_level = new_trust
159 logger.info(
160 f"Trust UPGRADED for {self.peer_id[:8]}: → {new_trust.value}")
161 return True
163 @property
164 def idle_seconds(self) -> float:
165 if self._last_activity == 0:
166 return 0
167 return time.time() - self._last_activity
169 def connect(self) -> bool:
170 """Initiate outgoing connection to peer."""
171 if self._state != LinkState.DISCONNECTED:
172 return self._state == LinkState.CONNECTED
174 self._state = LinkState.CONNECTING
175 try:
176 ws_url = self._resolve_ws_url()
178 try:
179 import websockets.sync.client as ws_client
180 self._ws = ws_client.connect(ws_url, open_timeout=10,
181 close_timeout=5)
182 except ImportError:
183 # Fallback: use websocket-client library
184 try:
185 import websocket
186 self._ws = websocket.WebSocket()
187 self._ws.connect(ws_url, timeout=10)
188 except ImportError:
189 logger.warning("No WebSocket library available (need websockets or websocket-client)")
190 self._state = LinkState.DISCONNECTED
191 return False
193 self._state = LinkState.HANDSHAKING
194 if not self._perform_handshake():
195 self.close()
196 return False
198 self._state = LinkState.CONNECTED
199 self._connected_at = time.time()
200 self._last_activity = time.time()
202 # Start receive loop
203 self._recv_thread = threading.Thread(
204 target=self._receive_loop, daemon=True,
205 name=f'peerlink-recv-{self.peer_id[:8]}')
206 self._recv_thread.start()
208 logger.info(f"PeerLink connected to {self.peer_id[:8]} "
209 f"(trust={self.trust.value}, encrypted={self.is_encrypted})")
210 return True
212 except Exception as e:
213 logger.debug(f"PeerLink connect failed to {self.peer_id[:8]}: {e}")
214 self._state = LinkState.DISCONNECTED
215 return False
217 def accept(self, ws, handshake_data: dict) -> bool:
218 """Accept incoming connection (called by link_manager's WS server)."""
219 self._ws = ws
220 self._state = LinkState.HANDSHAKING
222 try:
223 if not self._complete_handshake(handshake_data):
224 self.close()
225 return False
227 self._state = LinkState.CONNECTED
228 self._connected_at = time.time()
229 self._last_activity = time.time()
231 self._recv_thread = threading.Thread(
232 target=self._receive_loop, daemon=True,
233 name=f'peerlink-recv-{self.peer_id[:8]}')
234 self._recv_thread.start()
236 logger.info(f"PeerLink accepted from {self.peer_id[:8]} "
237 f"(trust={self.trust.value}, encrypted={self.is_encrypted})")
238 return True
239 except Exception as e:
240 logger.debug(f"PeerLink accept failed: {e}")
241 self._state = LinkState.DISCONNECTED
242 return False
244 def send(self, channel: str, data: dict, wait_response: bool = False,
245 timeout: float = 30.0) -> Optional[dict]:
246 """Send JSON message on a channel.
248 Args:
249 channel: Channel name (gossip, federation, compute, etc.)
250 data: JSON-serializable dict
251 wait_response: If True, block until response received
252 timeout: Max wait time for response
254 Returns:
255 Response dict if wait_response=True, else None
256 """
257 if not self.is_connected or self._ws is None:
258 return None
260 msg_id = uuid.uuid4().hex[:12]
261 frame = json.dumps({
262 'ch': channel,
263 'id': msg_id,
264 'd': data,
265 }, separators=(',', ':'))
267 frame_bytes = frame.encode('utf-8')
269 # Encrypt for PEER/RELAY trust
270 if self.trust in (TrustLevel.PEER, TrustLevel.RELAY) and self._session_key:
271 frame_bytes = self._encrypt(frame_bytes)
273 event = None
274 if wait_response:
275 event = threading.Event()
276 self._pending_responses[msg_id] = event
278 try:
279 self._ws_send(frame_bytes)
280 self._messages_sent += 1
281 self._bytes_sent += len(frame_bytes)
282 self._last_activity = time.time()
283 except Exception as e:
284 logger.debug(f"PeerLink send failed: {e}")
285 self._pending_responses.pop(msg_id, None)
286 self._handle_disconnect()
287 return None
289 if event:
290 event.wait(timeout=timeout)
291 self._pending_responses.pop(msg_id, None)
292 return self._response_data.pop(msg_id, None)
294 return None
296 def send_binary(self, channel: str, data: bytes) -> bool:
297 """Send binary data on a channel (sensor frames, etc.)."""
298 if not self.is_connected or self._ws is None:
299 return False
301 ch_id = CHANNEL_IDS.get(channel, 0xFF)
302 msg_id_bytes = struct.pack('>I', hash(time.time()) & 0xFFFFFFFF)
303 frame = bytes([ch_id]) + msg_id_bytes + data
305 if self.trust in (TrustLevel.PEER, TrustLevel.RELAY) and self._session_key:
306 frame = self._encrypt(frame)
308 try:
309 self._ws_send_binary(frame)
310 self._messages_sent += 1
311 self._bytes_sent += len(frame)
312 self._last_activity = time.time()
313 return True
314 except Exception:
315 self._handle_disconnect()
316 return False
318 def on_message(self, channel: str, handler: Callable) -> None:
319 """Register handler for incoming messages on a channel."""
320 if channel not in self._message_handlers:
321 self._message_handlers[channel] = []
322 self._message_handlers[channel].append(handler)
324 def _verify_same_user_proof(self, proof: str, peer_public_key: str) -> bool:
325 """Verify that the peer is owned by the same user.
327 Proof = peer signs our user_id with their Ed25519 key, and we
328 verify that their user_id matches ours. This prevents an attacker
329 from claiming SAME_USER trust without holding the user's key.
330 """
331 try:
332 from security.node_integrity import verify_message_signature
333 # The proof should be a signature of the local user_id
334 local_user_id = os.environ.get('HEVOLVE_USER_ID', '')
335 if not local_user_id or not peer_public_key:
336 return False
337 return verify_message_signature(peer_public_key, local_user_id, proof)
338 except (ImportError, Exception) as e:
339 logger.debug(f"SAME_USER proof verification failed: {e}")
340 return False
342 def close(self) -> None:
343 """Close the connection."""
344 if self._state == LinkState.CLOSING:
345 return
347 # Send bye BEFORE setting state — send() checks is_connected (state==CONNECTED)
348 try:
349 if self._ws:
350 self.send('control', {'type': 'bye'})
351 except Exception:
352 pass
354 self._state = LinkState.CLOSING
356 try:
357 if self._ws:
358 if hasattr(self._ws, 'close'):
359 self._ws.close()
360 except Exception:
361 pass
363 self._ws = None
364 self._session_key = None
365 self._state = LinkState.DISCONNECTED
366 logger.debug(f"PeerLink closed: {self.peer_id[:8]}")
368 def get_stats(self) -> dict:
369 return {
370 'peer_id': self.peer_id,
371 'state': self._state.value,
372 'trust': self.trust.value,
373 'encrypted': self.is_encrypted,
374 'connected_seconds': (time.time() - self._connected_at) if self._connected_at else 0,
375 'idle_seconds': self.idle_seconds,
376 'messages_sent': self._messages_sent,
377 'messages_received': self._messages_received,
378 'bytes_sent': self._bytes_sent,
379 'bytes_received': self._bytes_received,
380 'capabilities': self.capabilities,
381 }
383 # --- Internal: Handshake -------------------------------------------
385 def _perform_handshake(self) -> bool:
386 """Outgoing handshake: send our identity, receive theirs."""
387 try:
388 from security.node_integrity import get_public_key_hex, sign_json_payload
389 from security.channel_encryption import get_x25519_public_hex
390 except ImportError:
391 logger.warning("Security modules not available — handshake failed")
392 return False
394 hello = {
395 'type': 'hello',
396 'node_id': self.peer_id, # Will be overwritten with OUR id below
397 'ed25519_public': get_public_key_hex(),
398 'x25519_public': get_x25519_public_hex(),
399 'trust_requested': self.trust.value,
400 'protocol_version': 1,
401 'timestamp': time.time(),
402 }
404 # Get our actual node_id
405 try:
406 from security.node_integrity import get_node_identity
407 identity = get_node_identity()
408 hello['node_id'] = identity.get('node_id', '')
409 except Exception:
410 pass
412 # Attach pre-trust contract (proves we agreed to hive terms)
413 try:
414 from security.pre_trust_contract import get_pre_trust_verifier
415 from security.node_integrity import get_node_identity
416 nid = get_node_identity().get('node_id', '')
417 verifier = get_pre_trust_verifier()
418 contract = verifier.get_contract(nid)
419 if contract:
420 hello['trust_contract'] = contract
421 except Exception:
422 pass # Contract optional for SAME_USER trust
424 hello['signature'] = sign_json_payload(hello)
426 # Send hello
427 hello_bytes = json.dumps(hello, separators=(',', ':')).encode('utf-8')
428 self._ws_send(hello_bytes)
430 # Receive hello back
431 resp_bytes = self._ws_recv(timeout=10)
432 if not resp_bytes:
433 return False
435 try:
436 resp = json.loads(resp_bytes if isinstance(resp_bytes, str)
437 else resp_bytes.decode('utf-8'))
438 except (json.JSONDecodeError, UnicodeDecodeError):
439 return False
441 if resp.get('type') != 'hello_ack':
442 return False
444 # Verify their Ed25519 signature
445 peer_ed25519 = resp.get('ed25519_public', '')
446 peer_sig = resp.pop('signature', '')
447 if peer_ed25519 and peer_sig:
448 from security.node_integrity import verify_json_signature
449 if not verify_json_signature(peer_ed25519, resp, peer_sig):
450 logger.warning(f"Handshake signature verification failed for {self.peer_id[:8]}")
451 return False
452 elif os.environ.get('HEVOLVE_ENFORCEMENT_MODE') == 'hard':
453 # Hard mode: reject unsigned handshakes
454 logger.warning(f"Unsigned handshake rejected (hard enforcement) for {self.peer_id[:8]}")
455 return False
457 # Store peer's keys
458 self.peer_ed25519_public = peer_ed25519
459 self.peer_x25519_public = resp.get('x25519_public', '')
460 self.capabilities = resp.get('capabilities', {})
462 # Derive session key for PEER/RELAY trust
463 if self.trust in (TrustLevel.PEER, TrustLevel.RELAY) and self.peer_x25519_public:
464 self._derive_session_key()
466 return True
468 def _complete_handshake(self, hello_data: dict) -> bool:
469 """Incoming handshake: we received their hello, send ack."""
470 try:
471 from security.node_integrity import (
472 get_public_key_hex, sign_json_payload, verify_json_signature)
473 from security.channel_encryption import get_x25519_public_hex
474 except ImportError:
475 return False
477 # Verify their signature
478 peer_sig = hello_data.pop('signature', '')
479 peer_ed25519 = hello_data.get('ed25519_public', '')
480 if peer_ed25519 and peer_sig:
481 if not verify_json_signature(peer_ed25519, hello_data, peer_sig):
482 logger.warning("Incoming handshake signature verification failed")
483 return False
484 elif os.environ.get('HEVOLVE_ENFORCEMENT_MODE') == 'hard':
485 logger.warning("Unsigned incoming handshake rejected (hard enforcement)")
486 return False
488 self.peer_ed25519_public = peer_ed25519
489 self.peer_x25519_public = hello_data.get('x25519_public', '')
490 self.capabilities = hello_data.get('capabilities', {})
492 # Determine trust LOCALLY — never accept trust_requested from wire.
493 # SAME_USER requires proof: peer must present a user_id_signature
494 # signed by the same user key we hold. Without proof → PEER.
495 # G9: Trust ratchet enforced via set_trust() — cannot downgrade.
496 requested_trust = hello_data.get('trust_requested', 'peer')
497 if requested_trust == 'same_user':
498 # Verify SAME_USER claim cryptographically
499 user_proof = hello_data.get('user_id_proof', '')
500 if user_proof and self._verify_same_user_proof(user_proof, peer_ed25519):
501 if not self.set_trust(TrustLevel.SAME_USER):
502 logger.warning("Trust ratchet rejected SAME_USER (should not happen — upgrade)")
503 return False
504 else:
505 logger.warning("SAME_USER trust requested but no valid proof — setting PEER")
506 if not self.set_trust(TrustLevel.PEER):
507 logger.warning("Trust ratchet rejected PEER downgrade from higher level")
508 return False
509 else:
510 if not self.set_trust(TrustLevel.PEER):
511 logger.warning("Trust ratchet rejected PEER downgrade from higher level")
512 return False
514 # Verify pre-trust contract for PEER connections
515 # SAME_USER (own devices) are exempt — trust is user identity based
516 if self.trust != TrustLevel.SAME_USER:
517 try:
518 from security.pre_trust_contract import (
519 verify_trust_contract, TrustContract,
520 get_pre_trust_verifier,
521 )
522 contract_data = hello_data.get('trust_contract')
523 if contract_data:
524 contract = TrustContract(**{
525 k: v for k, v in contract_data.items()
526 if k in TrustContract.__dataclass_fields__
527 })
528 ok, msg = verify_trust_contract(contract)
529 if not ok:
530 logger.warning(
531 f"Pre-trust contract rejected for "
532 f"{self.peer_id[:8]}: {msg}")
533 return False
534 # Register verified contract
535 get_pre_trust_verifier().register_contract(contract)
536 logger.info(
537 f"Pre-trust contract verified for {self.peer_id[:8]}")
538 except ImportError:
539 pass # Module not available — allow legacy connections
541 # Send ack
542 ack = {
543 'type': 'hello_ack',
544 'ed25519_public': get_public_key_hex(),
545 'x25519_public': get_x25519_public_hex(),
546 'protocol_version': 1,
547 'capabilities': self._get_local_capabilities(),
548 'timestamp': time.time(),
549 }
550 ack['signature'] = sign_json_payload(ack)
552 ack_bytes = json.dumps(ack, separators=(',', ':')).encode('utf-8')
553 self._ws_send(ack_bytes)
555 # Derive session key for PEER/RELAY trust
556 if self.trust in (TrustLevel.PEER, TrustLevel.RELAY) and self.peer_x25519_public:
557 self._derive_session_key()
559 return True
561 def _derive_session_key(self):
562 """Derive AES-256-GCM session key from X25519 ECDH."""
563 try:
564 from security.channel_encryption import get_x25519_keypair
565 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PublicKey
566 from cryptography.hazmat.primitives.kdf.hkdf import HKDF
567 from cryptography.hazmat.primitives import hashes
569 our_private, _ = get_x25519_keypair()
570 peer_pub = X25519PublicKey.from_public_bytes(
571 bytes.fromhex(self.peer_x25519_public))
572 shared_secret = our_private.exchange(peer_pub)
574 self._session_key = HKDF(
575 algorithm=hashes.SHA256(),
576 length=32,
577 salt=b'hart-peerlink-session-v1',
578 info=b'hart-peerlink-v1',
579 ).derive(shared_secret)
581 self._key_established_at = time.time()
582 self._session_nonce_counter = 0
583 logger.debug(f"Session key derived for {self.peer_id[:8]}")
584 except Exception as e:
585 logger.warning(f"Session key derivation failed: {e}")
586 self._session_key = None
588 # --- Internal: Encryption ------------------------------------------
590 def _encrypt(self, plaintext: bytes) -> bytes:
591 """Encrypt with session key (AES-256-GCM). Prepends nonce."""
592 if not self._session_key:
593 return plaintext
595 from cryptography.hazmat.primitives.ciphers.aead import AESGCM
597 # Check key rotation
598 if (time.time() - self._key_established_at) > KEY_ROTATION_INTERVAL:
599 self._derive_session_key()
601 nonce = os.urandom(12)
602 ct = AESGCM(self._session_key).encrypt(nonce, plaintext, None)
603 return nonce + ct # 12 bytes nonce + ciphertext
605 def _decrypt(self, data: bytes) -> Optional[bytes]:
606 """Decrypt with session key. Expects nonce prefix."""
607 if not self._session_key:
608 return data
609 if len(data) < 13: # 12 nonce + at least 1 byte
610 return None
612 from cryptography.hazmat.primitives.ciphers.aead import AESGCM
614 nonce = data[:12]
615 ct = data[12:]
616 try:
617 return AESGCM(self._session_key).decrypt(nonce, ct, None)
618 except Exception as e:
619 logger.debug(f"Decrypt failed: {e}")
620 return None
622 # --- Internal: WebSocket I/O ---------------------------------------
624 def _resolve_ws_url(self) -> str:
625 """Resolve address to WebSocket URL."""
626 addr = self.address
627 if addr.startswith('ws://') or addr.startswith('wss://'):
628 return addr
629 # Default: plain WS for LAN, secure for WAN
630 if self.trust == TrustLevel.SAME_USER:
631 return f'ws://{addr}/peer_link'
632 return f'ws://{addr}/peer_link' # TLS handled at transport level if needed
634 def _ws_send(self, data: bytes) -> None:
635 """Send bytes over WebSocket (handles different libraries)."""
636 if self._ws is None:
637 raise ConnectionError("WebSocket not connected")
638 if hasattr(self._ws, 'send'):
639 self._ws.send(data)
641 def _ws_send_binary(self, data: bytes) -> None:
642 """Send binary data over WebSocket."""
643 if self._ws is None:
644 raise ConnectionError("WebSocket not connected")
645 if hasattr(self._ws, 'send'):
646 # websockets library
647 self._ws.send(data)
649 def _ws_recv(self, timeout: float = 30.0) -> Optional[bytes]:
650 """Receive from WebSocket with timeout."""
651 if self._ws is None:
652 return None
653 try:
654 if hasattr(self._ws, 'recv'):
655 # websockets sync client has recv(timeout)
656 try:
657 return self._ws.recv(timeout=timeout)
658 except TypeError:
659 # websocket-client doesn't have timeout param on recv
660 self._ws.settimeout(timeout)
661 return self._ws.recv()
662 except Exception:
663 return None
665 def _receive_loop(self):
666 """Background thread: receive and dispatch messages."""
667 while self._state == LinkState.CONNECTED and self._ws is not None:
668 try:
669 raw = self._ws_recv(timeout=60)
670 if raw is None:
671 continue
673 if isinstance(raw, str):
674 raw = raw.encode('utf-8')
676 # Decrypt if needed
677 if self.trust in (TrustLevel.PEER, TrustLevel.RELAY) and self._session_key:
678 decrypted = self._decrypt(raw)
679 if decrypted is None:
680 continue
681 raw = decrypted
683 self._messages_received += 1
684 self._bytes_received += len(raw)
685 self._last_activity = time.time()
687 # Try JSON (text message)
688 try:
689 msg = json.loads(raw.decode('utf-8') if isinstance(raw, bytes) else raw)
690 channel = msg.get('ch', 'control')
691 msg_id = msg.get('id', '')
692 data = msg.get('d', {})
694 # Check if this is a response to a pending request
695 if msg.get('re') and msg['re'] in self._pending_responses:
696 self._response_data[msg['re']] = data
697 self._pending_responses[msg['re']].set()
698 continue
700 # Dispatch to handlers
701 handlers = self._message_handlers.get(channel, [])
702 for handler in handlers:
703 try:
704 handler(channel, data, self.peer_id)
705 except Exception as e:
706 logger.debug(f"Handler error on {channel}: {e}")
707 continue
708 except (json.JSONDecodeError, UnicodeDecodeError):
709 pass
711 # Binary message
712 if len(raw) >= 5:
713 ch_id = raw[0]
714 channel = CHANNEL_NAMES.get(ch_id, 'unknown')
715 payload = raw[5:] # skip channel_id + msg_id_hash
716 handlers = self._message_handlers.get(channel, [])
717 for handler in handlers:
718 try:
719 handler(channel, payload, self.peer_id)
720 except Exception as e:
721 logger.debug(f"Binary handler error on {channel}: {e}")
723 except Exception as e:
724 if self._state == LinkState.CONNECTED:
725 logger.debug(f"Receive loop error: {e}")
726 self._handle_disconnect()
727 break
729 def _handle_disconnect(self):
730 """Handle unexpected disconnection."""
731 if self._state != LinkState.CONNECTED:
732 return
733 self._state = LinkState.DISCONNECTED
734 self._ws = None
735 self._session_key = None
736 logger.info(f"PeerLink disconnected: {self.peer_id[:8]}")
738 @staticmethod
739 def _get_local_capabilities() -> dict:
740 """Get local node capabilities for handshake."""
741 caps = {'cpu_count': os.cpu_count() or 1}
742 try:
743 from integrations.service_tools.vram_manager import detect_gpu
744 gpu = detect_gpu()
745 if gpu.get('available'):
746 caps['gpu'] = gpu.get('device_name', 'GPU')
747 caps['vram_mb'] = gpu.get('vram_total_mb', 0)
748 except Exception:
749 pass
750 try:
751 from security.key_delegation import get_node_tier
752 caps['tier'] = get_node_tier()
753 except Exception:
754 caps['tier'] = 'flat'
755 return caps