Coverage for core / peer_link / channels.py: 100.0%

55 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Channel definitions — what data flows on each PeerLink channel. 

3 

4Data classification determines encryption behavior: 

5 OPEN: Peer list exchange, health — no secrets. Encrypted only on PEER/RELAY trust. 

6 PRIVATE: User prompts, compute results — always encrypted on cross-user links. 

7 SYSTEM: Control messages, heartbeat — no secrets. 

8 

9Within same-user (ANY network — LAN, WAN, regional): all channels are 

10unencrypted (your own devices, trust based on authenticated user_id match). 

11Across users: PRIVATE channels are E2E encrypted at the link layer. 

12""" 

13import logging 

14import threading 

15from typing import Any, Callable, Dict, List, Optional 

16 

17logger = logging.getLogger('hevolve.peer_link') 

18 

19 

20class DataClass: 

21 """Data classification — determines privacy requirements.""" 

22 OPEN = 'open' # Public data (peer lists, health) 

23 PRIVATE = 'private' # User data (prompts, responses, compute results) 

24 SYSTEM = 'system' # Infrastructure (heartbeat, control, telemetry metadata) 

25 

26 

27# Channel registry: name -> config 

28CHANNEL_REGISTRY = { 

29 'control': { 

30 'id': 0x00, 

31 'data_class': DataClass.SYSTEM, 

32 'priority': 0, # Highest priority 

33 'reliable': True, 

34 'description': 'Handshake, heartbeat, disconnect, capability updates', 

35 }, 

36 'compute': { 

37 'id': 0x01, 

38 'data_class': DataClass.PRIVATE, # Prompts are private 

39 'priority': 1, 

40 'reliable': True, 

41 'description': 'Inference offload — prompts, results, model status', 

42 }, 

43 'dispatch': { 

44 'id': 0x02, 

45 'data_class': DataClass.PRIVATE, # Agent tasks are private 

46 'priority': 1, 

47 'reliable': True, 

48 'description': 'Agent task dispatch — goal execution, streaming results', 

49 }, 

50 'gossip': { 

51 'id': 0x03, 

52 'data_class': DataClass.OPEN, # Peer lists are public 

53 'priority': 2, 

54 'reliable': False, # Loss-tolerant (retry next round) 

55 'description': 'Peer list exchange, announce, health check', 

56 }, 

57 'federation': { 

58 'id': 0x04, 

59 'data_class': DataClass.OPEN, # Federated posts are public 

60 'priority': 3, 

61 'reliable': True, 

62 'description': 'Federated post delivery, follow/unfollow', 

63 }, 

64 'hivemind': { 

65 'id': 0x05, 

66 'data_class': DataClass.PRIVATE, # Thought vectors are private 

67 'priority': 1, 

68 'reliable': True, 

69 'description': 'HiveMind distributed thought — query, fuse, respond', 

70 }, 

71 'events': { 

72 'id': 0x06, 

73 'data_class': DataClass.OPEN, # Theme changes, config are public 

74 'priority': 4, 

75 'reliable': False, 

76 'description': 'EventBus cross-device — theme, config, lifecycle', 

77 }, 

78 'ralt': { 

79 'id': 0x07, 

80 'data_class': DataClass.OPEN, # Skill availability is public 

81 'priority': 3, 

82 'reliable': True, 

83 'description': 'RALT skill distribution — availability, ingestion trigger', 

84 }, 

85 'sensor': { 

86 'id': 0x08, 

87 'data_class': DataClass.PRIVATE, # Camera/screen frames are private 

88 'priority': 5, # Lowest priority (bulk data) 

89 'reliable': False, 

90 'description': 'Sensor frames — camera, screen, audio for HevolveAI learning', 

91 }, 

92 'messages': { 

93 'id': 0x09, 

94 'data_class': DataClass.PRIVATE, # DMs are always private 

95 'priority': 1, # Interactive — chat latency must feel instant 

96 'reliable': True, 

97 'description': 'P2P DMs, channel messages, reactions, read receipts, edits', 

98 }, 

99} 

100 

101# Reverse lookup: id -> name 

102CHANNEL_ID_TO_NAME = {v['id']: k for k, v in CHANNEL_REGISTRY.items()} 

103 

104# Compatibility aliases — CHANNEL_IDS (name→id) and CHANNEL_NAMES (id→name) 

105# are re-exported from link.py; both names are derived from CHANNEL_REGISTRY. 

106CHANNEL_IDS = {k: v['id'] for k, v in CHANNEL_REGISTRY.items()} 

107CHANNEL_NAMES = CHANNEL_ID_TO_NAME 

108 

109 

110def get_channel_config(channel: str) -> dict: 

111 """Get channel config. Returns empty dict for unknown channels.""" 

112 return CHANNEL_REGISTRY.get(channel, {}) 

113 

114 

115def is_private_channel(channel: str) -> bool: 

116 """Check if channel carries private data (requires E2E for cross-user).""" 

117 config = CHANNEL_REGISTRY.get(channel, {}) 

118 return config.get('data_class') == DataClass.PRIVATE 

119 

120 

121class ChannelDispatcher: 

122 """Routes incoming PeerLink messages to registered handlers. 

123 

124 Each subsystem registers its handler: 

125 dispatcher.register('gossip', peer_discovery.handle_exchange) 

126 dispatcher.register('federation', federation.receive_inbox) 

127 

128 When a message arrives on a channel, all registered handlers are called. 

129 """ 

130 

131 def __init__(self): 

132 self._handlers: Dict[str, List[Callable]] = {} 

133 self._lock = threading.Lock() 

134 

135 def register(self, channel: str, handler: Callable) -> None: 

136 """Register a handler for a channel. 

137 

138 Handler signature: handler(data: dict, sender_peer_id: str) -> Optional[dict] 

139 If handler returns a dict, it's sent back as response. 

140 """ 

141 with self._lock: 

142 if channel not in self._handlers: 

143 self._handlers[channel] = [] 

144 self._handlers[channel].append(handler) 

145 

146 def unregister(self, channel: str, handler: Callable) -> None: 

147 """Remove a handler.""" 

148 with self._lock: 

149 handlers = self._handlers.get(channel, []) 

150 if handler in handlers: 

151 handlers.remove(handler) 

152 

153 def dispatch(self, channel: str, data: Any, 

154 sender_peer_id: str) -> Optional[dict]: 

155 """Dispatch a message to all handlers for a channel. 

156 

157 Returns first non-None response (for request-response patterns). 

158 """ 

159 with self._lock: 

160 handlers = list(self._handlers.get(channel, [])) 

161 

162 response = None 

163 for handler in handlers: 

164 try: 

165 result = handler(data, sender_peer_id) 

166 if result is not None and response is None: 

167 response = result 

168 except Exception as e: 

169 logger.debug(f"Channel {channel} handler error: {e}") 

170 

171 return response 

172 

173 def has_handlers(self, channel: str) -> bool: 

174 with self._lock: 

175 return bool(self._handlers.get(channel)) 

176 

177 def get_registered_channels(self) -> List[str]: 

178 with self._lock: 

179 return [ch for ch, handlers in self._handlers.items() if handlers] 

180 

181 

182# Module-level singleton 

183_dispatcher: Optional[ChannelDispatcher] = None 

184_dispatcher_lock = threading.Lock() 

185 

186 

187def get_channel_dispatcher() -> ChannelDispatcher: 

188 global _dispatcher 

189 if _dispatcher is None: 

190 with _dispatcher_lock: 

191 if _dispatcher is None: 

192 _dispatcher = ChannelDispatcher() 

193 return _dispatcher