Coverage for core / peer_link / telemetry.py: 92.2%
192 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Telemetry & Safety — Crossbar connection for central monitoring.
4NOT optional. Three functions:
5 1. Telemetry: node -> central (traffic stats, compute metrics, health)
6 2. Control: central -> nodes (upgrades, bans, emergency halt)
7 3. Probe: central <-> node (diagnostics RPC)
9Central sees METADATA only — never message content.
10 - Traffic volume per channel (msg count, byte count)
11 - Peer connection topology
12 - GPU/compute utilization
13 - Economic flows (Spark earned/spent)
14 - Security events (guardrail violations, integrity status)
15 NOT message content — NEVER
16 NOT user prompts/responses — NEVER
17 NOT PeerLink E2E payloads — NEVER
19Safety: kill switch delivery via Crossbar (instant) + gossip (backup).
20 Master key signed emergency_halt -> HiveCircuitBreaker.trip()
21 Node disconnected >24h -> self-restrict (safety measure).
22"""
23import logging
24import os
25import threading
26import time
27from collections import defaultdict
28from typing import Any, Callable, Dict, List, Optional
30logger = logging.getLogger('hevolve.peer_link')
32# Disconnection thresholds (seconds)
33_DEGRADED_THRESHOLD = 3600 # 1 hour -> degraded mode
34_RESTRICTED_THRESHOLD = 86400 # 24 hours -> restricted mode
37class TelemetryCollector:
38 """Collects traffic and compute metrics for central reporting.
40 Aggregates per-channel message counts and byte totals.
41 Reset after each telemetry publish cycle.
42 """
44 def __init__(self):
45 self._lock = threading.Lock()
46 self._traffic: Dict[str, Dict[str, int]] = defaultdict(
47 lambda: {'sent': 0, 'recv': 0, 'bytes_sent': 0, 'bytes_recv': 0})
48 self._security_events: List[dict] = []
50 def record_sent(self, channel: str, byte_count: int):
51 with self._lock:
52 self._traffic[channel]['sent'] += 1
53 self._traffic[channel]['bytes_sent'] += byte_count
55 def record_received(self, channel: str, byte_count: int):
56 with self._lock:
57 self._traffic[channel]['recv'] += 1
58 self._traffic[channel]['bytes_recv'] += byte_count
60 def record_security_event(self, event_type: str, details: str = ''):
61 with self._lock:
62 self._security_events.append({
63 'type': event_type,
64 'details': details[:200],
65 'timestamp': time.time(),
66 })
67 # Keep only last 100 events
68 if len(self._security_events) > 100:
69 self._security_events = self._security_events[-100:]
71 def get_summary(self) -> dict:
72 """Get traffic summary and reset counters."""
73 with self._lock:
74 traffic = dict(self._traffic)
75 self._traffic = defaultdict(
76 lambda: {'sent': 0, 'recv': 0, 'bytes_sent': 0, 'bytes_recv': 0})
77 events = list(self._security_events)
78 self._security_events.clear()
80 return {
81 'traffic': {k: dict(v) for k, v in traffic.items()},
82 'security_events': events,
83 }
86class CentralConnection:
87 """Always-on connection to central Crossbar for telemetry and safety.
89 Publishes telemetry every 60 seconds.
90 Subscribes to control broadcast for emergency halt, peer bans, etc.
91 Responds to diagnostic probes from central.
92 Tracks disconnection duration for self-restriction.
93 """
95 def __init__(self):
96 self._crossbar_url = os.environ.get(
97 'CBURL', 'ws://aws_rasa.hertzai.com:8088/ws')
98 self._realm = os.environ.get('CBREALM', 'realm1')
99 self._connected = False
100 self._disconnected_since: Optional[float] = None
101 self._lock = threading.Lock()
102 self._telemetry = TelemetryCollector()
103 self._control_handlers: List[Callable] = []
104 self._running = False
105 self._thread: Optional[threading.Thread] = None
106 self._telemetry_interval = int(os.environ.get(
107 'HEVOLVE_TELEMETRY_INTERVAL', '60'))
108 self._node_id = ''
110 @property
111 def telemetry(self) -> TelemetryCollector:
112 return self._telemetry
114 @property
115 def is_connected(self) -> bool:
116 return self._connected
118 def start(self):
119 """Start telemetry publishing and control subscription."""
120 if self._running:
121 return
123 # Get node identity
124 try:
125 from security.node_integrity import get_node_identity
126 self._node_id = get_node_identity().get('node_id', 'unknown')
127 except Exception:
128 self._node_id = 'unknown'
130 self._running = True
131 self._thread = threading.Thread(
132 target=self._telemetry_loop, daemon=True,
133 name='peerlink-telemetry')
134 self._thread.start()
135 logger.info("CentralConnection started")
137 def stop(self):
138 self._running = False
139 if self._thread:
140 self._thread.join(timeout=10)
142 def on_control(self, handler: Callable) -> None:
143 """Register handler for control messages from central."""
144 self._control_handlers.append(handler)
146 def is_degraded(self) -> bool:
147 """Node is in degraded mode (>1h disconnected from central)."""
148 if self._connected or self._disconnected_since is None:
149 return False
150 return (time.time() - self._disconnected_since) > _DEGRADED_THRESHOLD
152 def is_restricted(self) -> bool:
153 """Node is in restricted mode (>24h disconnected from central)."""
154 if self._connected or self._disconnected_since is None:
155 return False
156 return (time.time() - self._disconnected_since) > _RESTRICTED_THRESHOLD
158 def get_disconnection_hours(self) -> float:
159 if self._connected or self._disconnected_since is None:
160 return 0
161 return (time.time() - self._disconnected_since) / 3600
163 # --- Internal ------------------------------------------------
165 def _telemetry_loop(self):
166 """Background: publish telemetry, handle control messages."""
167 while self._running:
168 try:
169 self._try_connect()
170 if self._connected:
171 self._publish_telemetry()
172 self._check_control_messages()
173 except Exception as e:
174 logger.debug(f"Telemetry loop error: {e}")
175 self._mark_disconnected()
177 # Sleep in small increments
178 for _ in range(self._telemetry_interval):
179 if not self._running:
180 break
181 time.sleep(1)
183 def _try_connect(self):
184 """Check if any outbound transport is available (WAMP or HTTP)."""
185 if self._connected:
186 return
188 # Check WAMP session
189 try:
190 from crossbar_server import wamp_session
191 if wamp_session is not None:
192 self._connected = True
193 self._disconnected_since = None
194 return
195 except ImportError:
196 pass
198 # Check if MessageBus has HTTP transport injected
199 try:
200 from core.peer_link.message_bus import get_message_bus
201 bus = get_message_bus()
202 if bus._http_transport is not None:
203 self._connected = True
204 self._disconnected_since = None
205 return
206 except Exception:
207 pass
209 self._mark_disconnected()
211 def _mark_disconnected(self):
212 if self._connected:
213 self._connected = False
214 self._disconnected_since = time.time()
216 def _publish_telemetry(self):
217 """Publish node telemetry to central (metadata only, never content)."""
218 summary = self._telemetry.get_summary()
220 telemetry = {
221 'node_id': self._node_id,
222 'timestamp': time.time(),
223 'traffic': summary.get('traffic', {}),
224 'security_events': summary.get('security_events', []),
225 }
227 # Add compute metrics
228 try:
229 from integrations.service_tools.vram_manager import detect_gpu
230 gpu = detect_gpu()
231 telemetry['compute'] = {
232 'gpu_available': gpu.get('available', False),
233 'gpu_name': gpu.get('device_name', ''),
234 'vram_free_mb': gpu.get('vram_free_mb', 0),
235 }
236 except Exception:
237 telemetry['compute'] = {}
239 # Add peer link stats
240 try:
241 from core.peer_link.link_manager import get_link_manager
242 mgr = get_link_manager()
243 status = mgr.get_status()
244 telemetry['peer_links'] = {
245 'active': status.get('active_links', 0),
246 'encrypted': status.get('encrypted_links', 0),
247 'total': status.get('total_links', 0),
248 }
249 except Exception:
250 telemetry['peer_links'] = {}
252 # Add health
253 telemetry['health'] = {
254 'cpu_count': os.cpu_count() or 1,
255 }
256 try:
257 import psutil
258 telemetry['health']['cpu_percent'] = psutil.cpu_percent()
259 telemetry['health']['memory_percent'] = (
260 psutil.virtual_memory().percent)
261 except ImportError:
262 pass
264 # Publish via MessageBus (handles WAMP → HTTP fallback internally)
265 # Telemetry is central-only: skip PeerLink (no peer needs our metrics)
266 try:
267 from core.peer_link.message_bus import get_message_bus
268 bus = get_message_bus()
269 telemetry['node_id'] = self._node_id
270 bus.publish('telemetry.node', telemetry,
271 skip_peerlink=True)
272 except Exception:
273 self._mark_disconnected()
275 def _check_control_messages(self):
276 """Check for control messages from central (emergency halt, bans).
278 In WAMP mode, control messages arrive via subscription
279 (handled in crossbar_server.py @component.on_join).
280 Here we handle the HTTP polling fallback.
281 """
282 pass
284 def handle_control_message(self, message: dict):
285 """Process a control message from central.
287 Called by crossbar_server.py when control broadcast received,
288 or by gossip when control message gossip-forwarded.
289 """
290 msg_type = message.get('type', '')
292 # EMERGENCY HALT — requires master key signature
293 if msg_type == 'emergency_halt':
294 self._handle_emergency_halt(message)
295 return
297 # PEER BAN
298 if msg_type == 'peer_ban':
299 self._handle_peer_ban(message)
300 return
302 # Forward to registered handlers
303 for handler in self._control_handlers:
304 try:
305 handler(message)
306 except Exception as e:
307 logger.debug(f"Control handler error: {e}")
309 def _handle_emergency_halt(self, message: dict):
310 """Verify master key signature and trip circuit breaker."""
311 signature = message.get('master_signature', '')
312 if not signature:
313 logger.warning("Emergency halt without signature — IGNORING")
314 return
316 # Verify against master public key
317 try:
318 from security.master_key import MASTER_PUBLIC_KEY_HEX
319 from security.node_integrity import verify_json_signature
321 msg_copy = dict(message)
322 sig = msg_copy.pop('master_signature', '')
324 if verify_json_signature(MASTER_PUBLIC_KEY_HEX, msg_copy, sig):
325 logger.critical(
326 "EMERGENCY HALT: Valid master key signature — "
327 "tripping circuit breaker")
328 try:
329 from security.hive_guardrails import HiveCircuitBreaker
330 HiveCircuitBreaker.trip(
331 reason=message.get('reason', 'emergency_halt'))
332 except Exception as e:
333 logger.critical(f"Circuit breaker trip failed: {e}")
335 # Gossip-forward to peers (backup delivery)
336 try:
337 from integrations.social.peer_discovery import gossip
338 gossip.broadcast({
339 'type': 'emergency_halt_relay',
340 'original': message,
341 })
342 except Exception:
343 pass
344 else:
345 logger.warning(
346 "Emergency halt INVALID signature — IGNORING")
347 except ImportError:
348 logger.error(
349 "Cannot verify emergency halt — security modules missing")
351 def _handle_peer_ban(self, message: dict):
352 """Ban a peer network-wide."""
353 banned_node = message.get('node_id', '')
354 if not banned_node:
355 return
357 logger.info(f"Peer ban received: {banned_node[:8]}")
359 # Close PeerLink if connected
360 try:
361 from core.peer_link.link_manager import get_link_manager
362 get_link_manager().close_link(banned_node)
363 except Exception:
364 pass
366 # Update DB
367 try:
368 from integrations.social.models import get_db, PeerNode
369 db = get_db()
370 try:
371 peer = db.query(PeerNode).filter_by(
372 node_id=banned_node).first()
373 if peer:
374 peer.integrity_status = 'banned'
375 db.commit()
376 finally:
377 db.close()
378 except Exception:
379 pass
381 self._telemetry.record_security_event('peer_ban', banned_node)
384# --- Singleton ------------------------------------------------
386_central: Optional[CentralConnection] = None
387_central_lock = threading.Lock()
390def get_central_connection() -> CentralConnection:
391 global _central
392 if _central is None:
393 with _central_lock:
394 if _central is None:
395 _central = CentralConnection()
396 return _central