Coverage for core / peer_link / telemetry.py: 92.2%

192 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Telemetry & Safety — Crossbar connection for central monitoring. 

3 

4NOT optional. Three functions: 

5 1. Telemetry: node -> central (traffic stats, compute metrics, health) 

6 2. Control: central -> nodes (upgrades, bans, emergency halt) 

7 3. Probe: central <-> node (diagnostics RPC) 

8 

9Central sees METADATA only — never message content. 

10 - Traffic volume per channel (msg count, byte count) 

11 - Peer connection topology 

12 - GPU/compute utilization 

13 - Economic flows (Spark earned/spent) 

14 - Security events (guardrail violations, integrity status) 

15 NOT message content — NEVER 

16 NOT user prompts/responses — NEVER 

17 NOT PeerLink E2E payloads — NEVER 

18 

19Safety: kill switch delivery via Crossbar (instant) + gossip (backup). 

20 Master key signed emergency_halt -> HiveCircuitBreaker.trip() 

21 Node disconnected >24h -> self-restrict (safety measure). 

22""" 

23import logging 

24import os 

25import threading 

26import time 

27from collections import defaultdict 

28from typing import Any, Callable, Dict, List, Optional 

29 

30logger = logging.getLogger('hevolve.peer_link') 

31 

32# Disconnection thresholds (seconds) 

33_DEGRADED_THRESHOLD = 3600 # 1 hour -> degraded mode 

34_RESTRICTED_THRESHOLD = 86400 # 24 hours -> restricted mode 

35 

36 

37class TelemetryCollector: 

38 """Collects traffic and compute metrics for central reporting. 

39 

40 Aggregates per-channel message counts and byte totals. 

41 Reset after each telemetry publish cycle. 

42 """ 

43 

44 def __init__(self): 

45 self._lock = threading.Lock() 

46 self._traffic: Dict[str, Dict[str, int]] = defaultdict( 

47 lambda: {'sent': 0, 'recv': 0, 'bytes_sent': 0, 'bytes_recv': 0}) 

48 self._security_events: List[dict] = [] 

49 

50 def record_sent(self, channel: str, byte_count: int): 

51 with self._lock: 

52 self._traffic[channel]['sent'] += 1 

53 self._traffic[channel]['bytes_sent'] += byte_count 

54 

55 def record_received(self, channel: str, byte_count: int): 

56 with self._lock: 

57 self._traffic[channel]['recv'] += 1 

58 self._traffic[channel]['bytes_recv'] += byte_count 

59 

60 def record_security_event(self, event_type: str, details: str = ''): 

61 with self._lock: 

62 self._security_events.append({ 

63 'type': event_type, 

64 'details': details[:200], 

65 'timestamp': time.time(), 

66 }) 

67 # Keep only last 100 events 

68 if len(self._security_events) > 100: 

69 self._security_events = self._security_events[-100:] 

70 

71 def get_summary(self) -> dict: 

72 """Get traffic summary and reset counters.""" 

73 with self._lock: 

74 traffic = dict(self._traffic) 

75 self._traffic = defaultdict( 

76 lambda: {'sent': 0, 'recv': 0, 'bytes_sent': 0, 'bytes_recv': 0}) 

77 events = list(self._security_events) 

78 self._security_events.clear() 

79 

80 return { 

81 'traffic': {k: dict(v) for k, v in traffic.items()}, 

82 'security_events': events, 

83 } 

84 

85 

86class CentralConnection: 

87 """Always-on connection to central Crossbar for telemetry and safety. 

88 

89 Publishes telemetry every 60 seconds. 

90 Subscribes to control broadcast for emergency halt, peer bans, etc. 

91 Responds to diagnostic probes from central. 

92 Tracks disconnection duration for self-restriction. 

93 """ 

94 

95 def __init__(self): 

96 self._crossbar_url = os.environ.get( 

97 'CBURL', 'ws://aws_rasa.hertzai.com:8088/ws') 

98 self._realm = os.environ.get('CBREALM', 'realm1') 

99 self._connected = False 

100 self._disconnected_since: Optional[float] = None 

101 self._lock = threading.Lock() 

102 self._telemetry = TelemetryCollector() 

103 self._control_handlers: List[Callable] = [] 

104 self._running = False 

105 self._thread: Optional[threading.Thread] = None 

106 self._telemetry_interval = int(os.environ.get( 

107 'HEVOLVE_TELEMETRY_INTERVAL', '60')) 

108 self._node_id = '' 

109 

110 @property 

111 def telemetry(self) -> TelemetryCollector: 

112 return self._telemetry 

113 

114 @property 

115 def is_connected(self) -> bool: 

116 return self._connected 

117 

118 def start(self): 

119 """Start telemetry publishing and control subscription.""" 

120 if self._running: 

121 return 

122 

123 # Get node identity 

124 try: 

125 from security.node_integrity import get_node_identity 

126 self._node_id = get_node_identity().get('node_id', 'unknown') 

127 except Exception: 

128 self._node_id = 'unknown' 

129 

130 self._running = True 

131 self._thread = threading.Thread( 

132 target=self._telemetry_loop, daemon=True, 

133 name='peerlink-telemetry') 

134 self._thread.start() 

135 logger.info("CentralConnection started") 

136 

137 def stop(self): 

138 self._running = False 

139 if self._thread: 

140 self._thread.join(timeout=10) 

141 

142 def on_control(self, handler: Callable) -> None: 

143 """Register handler for control messages from central.""" 

144 self._control_handlers.append(handler) 

145 

146 def is_degraded(self) -> bool: 

147 """Node is in degraded mode (>1h disconnected from central).""" 

148 if self._connected or self._disconnected_since is None: 

149 return False 

150 return (time.time() - self._disconnected_since) > _DEGRADED_THRESHOLD 

151 

152 def is_restricted(self) -> bool: 

153 """Node is in restricted mode (>24h disconnected from central).""" 

154 if self._connected or self._disconnected_since is None: 

155 return False 

156 return (time.time() - self._disconnected_since) > _RESTRICTED_THRESHOLD 

157 

158 def get_disconnection_hours(self) -> float: 

159 if self._connected or self._disconnected_since is None: 

160 return 0 

161 return (time.time() - self._disconnected_since) / 3600 

162 

163 # --- Internal ------------------------------------------------ 

164 

165 def _telemetry_loop(self): 

166 """Background: publish telemetry, handle control messages.""" 

167 while self._running: 

168 try: 

169 self._try_connect() 

170 if self._connected: 

171 self._publish_telemetry() 

172 self._check_control_messages() 

173 except Exception as e: 

174 logger.debug(f"Telemetry loop error: {e}") 

175 self._mark_disconnected() 

176 

177 # Sleep in small increments 

178 for _ in range(self._telemetry_interval): 

179 if not self._running: 

180 break 

181 time.sleep(1) 

182 

183 def _try_connect(self): 

184 """Check if any outbound transport is available (WAMP or HTTP).""" 

185 if self._connected: 

186 return 

187 

188 # Check WAMP session 

189 try: 

190 from crossbar_server import wamp_session 

191 if wamp_session is not None: 

192 self._connected = True 

193 self._disconnected_since = None 

194 return 

195 except ImportError: 

196 pass 

197 

198 # Check if MessageBus has HTTP transport injected 

199 try: 

200 from core.peer_link.message_bus import get_message_bus 

201 bus = get_message_bus() 

202 if bus._http_transport is not None: 

203 self._connected = True 

204 self._disconnected_since = None 

205 return 

206 except Exception: 

207 pass 

208 

209 self._mark_disconnected() 

210 

211 def _mark_disconnected(self): 

212 if self._connected: 

213 self._connected = False 

214 self._disconnected_since = time.time() 

215 

216 def _publish_telemetry(self): 

217 """Publish node telemetry to central (metadata only, never content).""" 

218 summary = self._telemetry.get_summary() 

219 

220 telemetry = { 

221 'node_id': self._node_id, 

222 'timestamp': time.time(), 

223 'traffic': summary.get('traffic', {}), 

224 'security_events': summary.get('security_events', []), 

225 } 

226 

227 # Add compute metrics 

228 try: 

229 from integrations.service_tools.vram_manager import detect_gpu 

230 gpu = detect_gpu() 

231 telemetry['compute'] = { 

232 'gpu_available': gpu.get('available', False), 

233 'gpu_name': gpu.get('device_name', ''), 

234 'vram_free_mb': gpu.get('vram_free_mb', 0), 

235 } 

236 except Exception: 

237 telemetry['compute'] = {} 

238 

239 # Add peer link stats 

240 try: 

241 from core.peer_link.link_manager import get_link_manager 

242 mgr = get_link_manager() 

243 status = mgr.get_status() 

244 telemetry['peer_links'] = { 

245 'active': status.get('active_links', 0), 

246 'encrypted': status.get('encrypted_links', 0), 

247 'total': status.get('total_links', 0), 

248 } 

249 except Exception: 

250 telemetry['peer_links'] = {} 

251 

252 # Add health 

253 telemetry['health'] = { 

254 'cpu_count': os.cpu_count() or 1, 

255 } 

256 try: 

257 import psutil 

258 telemetry['health']['cpu_percent'] = psutil.cpu_percent() 

259 telemetry['health']['memory_percent'] = ( 

260 psutil.virtual_memory().percent) 

261 except ImportError: 

262 pass 

263 

264 # Publish via MessageBus (handles WAMP → HTTP fallback internally) 

265 # Telemetry is central-only: skip PeerLink (no peer needs our metrics) 

266 try: 

267 from core.peer_link.message_bus import get_message_bus 

268 bus = get_message_bus() 

269 telemetry['node_id'] = self._node_id 

270 bus.publish('telemetry.node', telemetry, 

271 skip_peerlink=True) 

272 except Exception: 

273 self._mark_disconnected() 

274 

275 def _check_control_messages(self): 

276 """Check for control messages from central (emergency halt, bans). 

277 

278 In WAMP mode, control messages arrive via subscription 

279 (handled in crossbar_server.py @component.on_join). 

280 Here we handle the HTTP polling fallback. 

281 """ 

282 pass 

283 

284 def handle_control_message(self, message: dict): 

285 """Process a control message from central. 

286 

287 Called by crossbar_server.py when control broadcast received, 

288 or by gossip when control message gossip-forwarded. 

289 """ 

290 msg_type = message.get('type', '') 

291 

292 # EMERGENCY HALT — requires master key signature 

293 if msg_type == 'emergency_halt': 

294 self._handle_emergency_halt(message) 

295 return 

296 

297 # PEER BAN 

298 if msg_type == 'peer_ban': 

299 self._handle_peer_ban(message) 

300 return 

301 

302 # Forward to registered handlers 

303 for handler in self._control_handlers: 

304 try: 

305 handler(message) 

306 except Exception as e: 

307 logger.debug(f"Control handler error: {e}") 

308 

309 def _handle_emergency_halt(self, message: dict): 

310 """Verify master key signature and trip circuit breaker.""" 

311 signature = message.get('master_signature', '') 

312 if not signature: 

313 logger.warning("Emergency halt without signature — IGNORING") 

314 return 

315 

316 # Verify against master public key 

317 try: 

318 from security.master_key import MASTER_PUBLIC_KEY_HEX 

319 from security.node_integrity import verify_json_signature 

320 

321 msg_copy = dict(message) 

322 sig = msg_copy.pop('master_signature', '') 

323 

324 if verify_json_signature(MASTER_PUBLIC_KEY_HEX, msg_copy, sig): 

325 logger.critical( 

326 "EMERGENCY HALT: Valid master key signature — " 

327 "tripping circuit breaker") 

328 try: 

329 from security.hive_guardrails import HiveCircuitBreaker 

330 HiveCircuitBreaker.trip( 

331 reason=message.get('reason', 'emergency_halt')) 

332 except Exception as e: 

333 logger.critical(f"Circuit breaker trip failed: {e}") 

334 

335 # Gossip-forward to peers (backup delivery) 

336 try: 

337 from integrations.social.peer_discovery import gossip 

338 gossip.broadcast({ 

339 'type': 'emergency_halt_relay', 

340 'original': message, 

341 }) 

342 except Exception: 

343 pass 

344 else: 

345 logger.warning( 

346 "Emergency halt INVALID signature — IGNORING") 

347 except ImportError: 

348 logger.error( 

349 "Cannot verify emergency halt — security modules missing") 

350 

351 def _handle_peer_ban(self, message: dict): 

352 """Ban a peer network-wide.""" 

353 banned_node = message.get('node_id', '') 

354 if not banned_node: 

355 return 

356 

357 logger.info(f"Peer ban received: {banned_node[:8]}") 

358 

359 # Close PeerLink if connected 

360 try: 

361 from core.peer_link.link_manager import get_link_manager 

362 get_link_manager().close_link(banned_node) 

363 except Exception: 

364 pass 

365 

366 # Update DB 

367 try: 

368 from integrations.social.models import get_db, PeerNode 

369 db = get_db() 

370 try: 

371 peer = db.query(PeerNode).filter_by( 

372 node_id=banned_node).first() 

373 if peer: 

374 peer.integrity_status = 'banned' 

375 db.commit() 

376 finally: 

377 db.close() 

378 except Exception: 

379 pass 

380 

381 self._telemetry.record_security_event('peer_ban', banned_node) 

382 

383 

384# --- Singleton ------------------------------------------------ 

385 

386_central: Optional[CentralConnection] = None 

387_central_lock = threading.Lock() 

388 

389 

390def get_central_connection() -> CentralConnection: 

391 global _central 

392 if _central is None: 

393 with _central_lock: 

394 if _central is None: 

395 _central = CentralConnection() 

396 return _central