Coverage for core / peer_link / channels.py: 100.0%
55 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Channel definitions — what data flows on each PeerLink channel.
4Data classification determines encryption behavior:
5 OPEN: Peer list exchange, health — no secrets. Encrypted only on PEER/RELAY trust.
6 PRIVATE: User prompts, compute results — always encrypted on cross-user links.
7 SYSTEM: Control messages, heartbeat — no secrets.
9Within same-user (ANY network — LAN, WAN, regional): all channels are
10unencrypted (your own devices, trust based on authenticated user_id match).
11Across users: PRIVATE channels are E2E encrypted at the link layer.
12"""
13import logging
14import threading
15from typing import Any, Callable, Dict, List, Optional
17logger = logging.getLogger('hevolve.peer_link')
20class DataClass:
21 """Data classification — determines privacy requirements."""
22 OPEN = 'open' # Public data (peer lists, health)
23 PRIVATE = 'private' # User data (prompts, responses, compute results)
24 SYSTEM = 'system' # Infrastructure (heartbeat, control, telemetry metadata)
27# Channel registry: name -> config
28CHANNEL_REGISTRY = {
29 'control': {
30 'id': 0x00,
31 'data_class': DataClass.SYSTEM,
32 'priority': 0, # Highest priority
33 'reliable': True,
34 'description': 'Handshake, heartbeat, disconnect, capability updates',
35 },
36 'compute': {
37 'id': 0x01,
38 'data_class': DataClass.PRIVATE, # Prompts are private
39 'priority': 1,
40 'reliable': True,
41 'description': 'Inference offload — prompts, results, model status',
42 },
43 'dispatch': {
44 'id': 0x02,
45 'data_class': DataClass.PRIVATE, # Agent tasks are private
46 'priority': 1,
47 'reliable': True,
48 'description': 'Agent task dispatch — goal execution, streaming results',
49 },
50 'gossip': {
51 'id': 0x03,
52 'data_class': DataClass.OPEN, # Peer lists are public
53 'priority': 2,
54 'reliable': False, # Loss-tolerant (retry next round)
55 'description': 'Peer list exchange, announce, health check',
56 },
57 'federation': {
58 'id': 0x04,
59 'data_class': DataClass.OPEN, # Federated posts are public
60 'priority': 3,
61 'reliable': True,
62 'description': 'Federated post delivery, follow/unfollow',
63 },
64 'hivemind': {
65 'id': 0x05,
66 'data_class': DataClass.PRIVATE, # Thought vectors are private
67 'priority': 1,
68 'reliable': True,
69 'description': 'HiveMind distributed thought — query, fuse, respond',
70 },
71 'events': {
72 'id': 0x06,
73 'data_class': DataClass.OPEN, # Theme changes, config are public
74 'priority': 4,
75 'reliable': False,
76 'description': 'EventBus cross-device — theme, config, lifecycle',
77 },
78 'ralt': {
79 'id': 0x07,
80 'data_class': DataClass.OPEN, # Skill availability is public
81 'priority': 3,
82 'reliable': True,
83 'description': 'RALT skill distribution — availability, ingestion trigger',
84 },
85 'sensor': {
86 'id': 0x08,
87 'data_class': DataClass.PRIVATE, # Camera/screen frames are private
88 'priority': 5, # Lowest priority (bulk data)
89 'reliable': False,
90 'description': 'Sensor frames — camera, screen, audio for HevolveAI learning',
91 },
92 'messages': {
93 'id': 0x09,
94 'data_class': DataClass.PRIVATE, # DMs are always private
95 'priority': 1, # Interactive — chat latency must feel instant
96 'reliable': True,
97 'description': 'P2P DMs, channel messages, reactions, read receipts, edits',
98 },
99}
101# Reverse lookup: id -> name
102CHANNEL_ID_TO_NAME = {v['id']: k for k, v in CHANNEL_REGISTRY.items()}
104# Compatibility aliases — CHANNEL_IDS (name→id) and CHANNEL_NAMES (id→name)
105# are re-exported from link.py; both names are derived from CHANNEL_REGISTRY.
106CHANNEL_IDS = {k: v['id'] for k, v in CHANNEL_REGISTRY.items()}
107CHANNEL_NAMES = CHANNEL_ID_TO_NAME
110def get_channel_config(channel: str) -> dict:
111 """Get channel config. Returns empty dict for unknown channels."""
112 return CHANNEL_REGISTRY.get(channel, {})
115def is_private_channel(channel: str) -> bool:
116 """Check if channel carries private data (requires E2E for cross-user)."""
117 config = CHANNEL_REGISTRY.get(channel, {})
118 return config.get('data_class') == DataClass.PRIVATE
121class ChannelDispatcher:
122 """Routes incoming PeerLink messages to registered handlers.
124 Each subsystem registers its handler:
125 dispatcher.register('gossip', peer_discovery.handle_exchange)
126 dispatcher.register('federation', federation.receive_inbox)
128 When a message arrives on a channel, all registered handlers are called.
129 """
131 def __init__(self):
132 self._handlers: Dict[str, List[Callable]] = {}
133 self._lock = threading.Lock()
135 def register(self, channel: str, handler: Callable) -> None:
136 """Register a handler for a channel.
138 Handler signature: handler(data: dict, sender_peer_id: str) -> Optional[dict]
139 If handler returns a dict, it's sent back as response.
140 """
141 with self._lock:
142 if channel not in self._handlers:
143 self._handlers[channel] = []
144 self._handlers[channel].append(handler)
146 def unregister(self, channel: str, handler: Callable) -> None:
147 """Remove a handler."""
148 with self._lock:
149 handlers = self._handlers.get(channel, [])
150 if handler in handlers:
151 handlers.remove(handler)
153 def dispatch(self, channel: str, data: Any,
154 sender_peer_id: str) -> Optional[dict]:
155 """Dispatch a message to all handlers for a channel.
157 Returns first non-None response (for request-response patterns).
158 """
159 with self._lock:
160 handlers = list(self._handlers.get(channel, []))
162 response = None
163 for handler in handlers:
164 try:
165 result = handler(data, sender_peer_id)
166 if result is not None and response is None:
167 response = result
168 except Exception as e:
169 logger.debug(f"Channel {channel} handler error: {e}")
171 return response
173 def has_handlers(self, channel: str) -> bool:
174 with self._lock:
175 return bool(self._handlers.get(channel))
177 def get_registered_channels(self) -> List[str]:
178 with self._lock:
179 return [ch for ch, handlers in self._handlers.items() if handlers]
182# Module-level singleton
183_dispatcher: Optional[ChannelDispatcher] = None
184_dispatcher_lock = threading.Lock()
187def get_channel_dispatcher() -> ChannelDispatcher:
188 global _dispatcher
189 if _dispatcher is None:
190 with _dispatcher_lock:
191 if _dispatcher is None:
192 _dispatcher = ChannelDispatcher()
193 return _dispatcher