Coverage for integrations / robotics / safety_monitor.py: 86.1%
180 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Safety Monitor — E-Stop + Operational Domain Enforcement
4Safety first. A robot that moves but has no emergency stop is dangerous.
5A GPIO pin going LOW must halt all motor output within 50ms, not wait
6for a gossip round.
8Design principles:
9 1. Safety is local-first. E-stop does NOT require network, master key,
10 or agent approval.
11 2. Agents cannot clear E-stop. Only human operators can.
12 3. Every motor command passes through workspace limit checks.
13 4. Fleet notification (gossip) is informational, not a gate.
15Usage:
16 from integrations.robotics.safety_monitor import SafetyMonitor
17 monitor = SafetyMonitor()
18 monitor.register_estop_pin(17) # GPIO 17 as hardware E-stop
19 monitor.register_workspace_limits({'x': (-1.0, 1.0), 'y': (-0.5, 0.5)})
20 monitor.start()
21"""
22import logging
23import os
24import re
25import threading
26import time
27from typing import Callable, Dict, List, Optional, Tuple
29logger = logging.getLogger('hevolve_robotics')
31# Singleton instance
32_monitor = None
33_monitor_lock = threading.Lock()
36def get_safety_monitor() -> 'SafetyMonitor':
37 """Get or create the singleton SafetyMonitor."""
38 global _monitor
39 if _monitor is None:
40 with _monitor_lock:
41 if _monitor is None:
42 _monitor = SafetyMonitor()
43 return _monitor
46class SafetyMonitor:
47 """E-Stop + Operational Domain Monitor.
49 Thread-safe. Runs a dedicated monitor thread at 20Hz for E-stop
50 polling when hardware sources are registered.
51 """
53 MONITOR_HZ = 20 # 50ms poll interval
55 def __init__(self):
56 self._lock = threading.RLock()
57 self._estop_active = False
58 self._estop_reason = ''
59 self._estop_source = ''
60 self._estop_timestamp = 0.0
61 self._cleared_by = ''
63 # Workspace limits: {axis: (min, max)}
64 self._workspace_limits: Dict[str, Tuple[float, float]] = {}
65 self._joint_limits: Dict[str, Tuple[float, float]] = {}
67 # Registered E-stop sources
68 self._estop_gpio_pins: List[int] = []
69 self._estop_serial_ports: List[Dict] = [] # [{'port': str, 'pattern': str}]
71 # Callbacks for E-stop events
72 self._on_estop_callbacks: List[Callable] = []
74 # Audit trail
75 self._audit_trail: List[Dict] = []
76 self._max_audit = 100
78 # Monitor thread
79 self._monitor_thread: Optional[threading.Thread] = None
80 self._running = False
82 def register_estop_pin(self, pin: int):
83 """Register a GPIO pin as hardware E-stop source.
85 When the pin reads LOW (pulled to ground), E-stop triggers.
86 """
87 with self._lock:
88 if pin not in self._estop_gpio_pins:
89 self._estop_gpio_pins.append(pin)
90 logger.info(f"Safety: registered E-stop GPIO pin {pin}")
92 def register_estop_serial(self, port: str, trigger_pattern: str = 'ESTOP'):
93 """Register a serial port as E-stop source.
95 When the trigger pattern is received on the serial port, E-stop triggers.
96 """
97 with self._lock:
98 self._estop_serial_ports.append({
99 'port': port,
100 'pattern': trigger_pattern,
101 'compiled': re.compile(re.escape(trigger_pattern)),
102 })
103 logger.info(f"Safety: registered E-stop serial {port} (pattern={trigger_pattern})")
105 def register_workspace_limits(self, limits: Dict):
106 """Define operational domain bounds.
108 Args:
109 limits: Dict with axis limits and optional joint limits.
110 {
111 'x': (-1.0, 1.0),
112 'y': (-0.5, 0.5),
113 'z': (0.0, 1.2),
114 'joint_limits': {'joint_0': (-90, 90), 'joint_1': (0, 180)}
115 }
116 """
117 with self._lock:
118 joint_limits = limits.pop('joint_limits', {})
119 for axis, bounds in limits.items():
120 if isinstance(bounds, (list, tuple)) and len(bounds) == 2:
121 self._workspace_limits[axis] = (float(bounds[0]), float(bounds[1]))
122 for joint, bounds in joint_limits.items():
123 if isinstance(bounds, (list, tuple)) and len(bounds) == 2:
124 self._joint_limits[joint] = (float(bounds[0]), float(bounds[1]))
125 logger.info(
126 f"Safety: workspace limits set — "
127 f"axes={list(self._workspace_limits.keys())}, "
128 f"joints={list(self._joint_limits.keys())}"
129 )
131 def on_estop(self, callback: Callable):
132 """Register a callback for E-stop events.
134 Callback receives (reason: str, source: str).
135 """
136 with self._lock:
137 self._on_estop_callbacks.append(callback)
139 def check_position_safe(self, position: Dict) -> bool:
140 """Validate a target position against workspace limits.
142 Args:
143 position: Dict with axis values, e.g. {'x': 0.5, 'y': 0.2, 'z': 0.8}
144 and/or joint values, e.g. {'joint_0': 45.0}
146 Returns:
147 True if position is within all configured limits.
148 """
149 if self._estop_active:
150 return False
152 with self._lock:
153 # Check Cartesian workspace limits
154 for axis, (lo, hi) in self._workspace_limits.items():
155 val = position.get(axis)
156 if val is not None:
157 if float(val) < lo or float(val) > hi:
158 logger.warning(
159 f"Safety: position {axis}={val} outside limits [{lo}, {hi}]"
160 )
161 return False
163 # Check joint limits
164 for joint, (lo, hi) in self._joint_limits.items():
165 val = position.get(joint)
166 if val is not None:
167 if float(val) < lo or float(val) > hi:
168 logger.warning(
169 f"Safety: joint {joint}={val} outside limits [{lo}, {hi}]"
170 )
171 return False
173 return True
175 def trigger_estop(self, reason: str, source: str = 'manual'):
176 """Trigger emergency stop. Immediate. No network dependency.
178 Sets HEVOLVE_HALTED env var, fires callbacks, logs to audit trail.
179 Fleet notification is informational (best-effort gossip).
181 Args:
182 reason: Human-readable reason for the E-stop.
183 source: Source identifier ('gpio_17', 'serial_/dev/ttyUSB0', 'manual', 'fleet').
184 """
185 with self._lock:
186 if self._estop_active:
187 return # Already stopped
189 self._estop_active = True
190 self._estop_reason = reason
191 self._estop_source = source
192 self._estop_timestamp = time.time()
193 self._cleared_by = ''
195 # Set halt flag immediately
196 os.environ['HEVOLVE_HALTED'] = 'true'
197 os.environ['HEVOLVE_HALT_REASON'] = f'E-STOP: {reason}'
199 audit_entry = {
200 'event': 'estop_triggered',
201 'reason': reason,
202 'source': source,
203 'timestamp': self._estop_timestamp,
204 }
205 self._audit_trail.append(audit_entry)
206 if len(self._audit_trail) > self._max_audit:
207 self._audit_trail = self._audit_trail[-self._max_audit:]
209 logger.critical(f"E-STOP TRIGGERED: {reason} (source={source})")
211 # Fire callbacks (outside lock)
212 for cb in self._on_estop_callbacks:
213 try:
214 cb(reason, source)
215 except Exception as e:
216 logger.error(f"Safety callback error: {e}")
218 # Local halt via HiveCircuitBreaker (no master key needed)
219 try:
220 from security.hive_guardrails import HiveCircuitBreaker
221 HiveCircuitBreaker.local_halt(f'E-STOP: {reason}')
222 except (ImportError, AttributeError):
223 pass
225 # Best-effort gossip notification (informational)
226 self._gossip_estop(reason, source)
228 def clear_estop(self, operator_id: str) -> bool:
229 """Clear E-stop. HUMAN ONLY. Agents cannot clear.
231 Args:
232 operator_id: Human operator identifier. Must not be empty
233 and must not start with 'agent_' or 'bot_'.
235 Returns:
236 True if E-stop was cleared, False if rejected.
237 """
238 if not operator_id:
239 logger.warning("Safety: E-stop clear rejected — no operator_id")
240 return False
242 # Reject agent-initiated clears
243 lower_id = operator_id.lower()
244 if lower_id.startswith(('agent_', 'bot_', 'system_', 'auto_')):
245 logger.warning(
246 f"Safety: E-stop clear rejected — "
247 f"operator '{operator_id}' appears to be an agent, not a human"
248 )
249 return False
251 with self._lock:
252 if not self._estop_active:
253 return True # Already clear
255 self._estop_active = False
256 self._cleared_by = operator_id
258 os.environ.pop('HEVOLVE_HALTED', None)
259 os.environ.pop('HEVOLVE_HALT_REASON', None)
261 audit_entry = {
262 'event': 'estop_cleared',
263 'operator_id': operator_id,
264 'previous_reason': self._estop_reason,
265 'timestamp': time.time(),
266 }
267 self._audit_trail.append(audit_entry)
268 if len(self._audit_trail) > self._max_audit:
269 self._audit_trail = self._audit_trail[-self._max_audit:]
271 logger.info(f"E-STOP CLEARED by operator: {operator_id}")
272 return True
274 @property
275 def is_estopped(self) -> bool:
276 """Check if E-stop is currently active."""
277 return self._estop_active
279 def get_safety_status(self) -> Dict:
280 """Get current safety status for dashboard/fleet reporting."""
281 with self._lock:
282 return {
283 'estop_active': self._estop_active,
284 'estop_reason': self._estop_reason,
285 'estop_source': self._estop_source,
286 'estop_timestamp': self._estop_timestamp,
287 'cleared_by': self._cleared_by,
288 'workspace_limits': dict(self._workspace_limits),
289 'joint_limits': dict(self._joint_limits),
290 'estop_gpio_pins': list(self._estop_gpio_pins),
291 'estop_serial_ports': [
292 {'port': s['port'], 'pattern': s['pattern']}
293 for s in self._estop_serial_ports
294 ],
295 'audit_trail': list(self._audit_trail[-10:]),
296 'monitor_running': self._running,
297 }
299 def start(self):
300 """Start the E-stop monitor thread (20Hz).
302 Only starts if GPIO or serial E-stop sources are registered.
303 """
304 with self._lock:
305 if self._running:
306 return
307 if not self._estop_gpio_pins and not self._estop_serial_ports:
308 logger.info("Safety: no E-stop sources registered, monitor not started")
309 return
310 self._running = True
312 self._monitor_thread = threading.Thread(
313 target=self._monitor_loop,
314 name='safety_estop_monitor',
315 daemon=True,
316 )
317 self._monitor_thread.start()
318 logger.info(f"Safety: E-stop monitor started at {self.MONITOR_HZ}Hz")
320 def stop(self):
321 """Stop the E-stop monitor thread."""
322 self._running = False
323 if self._monitor_thread and self._monitor_thread.is_alive():
324 self._monitor_thread.join(timeout=1.0)
325 logger.info("Safety: E-stop monitor stopped")
327 def _monitor_loop(self):
328 """Poll GPIO and serial E-stop sources at MONITOR_HZ."""
329 interval = 1.0 / self.MONITOR_HZ
330 while self._running:
331 try:
332 self._check_gpio_estops()
333 self._check_serial_estops()
334 except Exception as e:
335 logger.error(f"Safety monitor error: {e}")
336 time.sleep(interval)
338 def _check_gpio_estops(self):
339 """Check registered GPIO pins for E-stop trigger (LOW = triggered)."""
340 if not self._estop_gpio_pins:
341 return
343 try:
344 import gpiod
345 chip = gpiod.Chip('gpiochip0')
346 for pin in self._estop_gpio_pins:
347 try:
348 line = chip.get_line(pin)
349 line.request(consumer='hart_estop', type=gpiod.LINE_REQ_DIR_IN)
350 value = line.get_value()
351 line.release()
352 if value == 0: # LOW = E-stop triggered
353 self.trigger_estop(
354 f'Hardware E-stop pin {pin} triggered (LOW)',
355 source=f'gpio_{pin}',
356 )
357 return
358 except Exception:
359 pass
360 except ImportError:
361 # Try RPi.GPIO fallback
362 try:
363 import RPi.GPIO as GPIO
364 for pin in self._estop_gpio_pins:
365 try:
366 GPIO.setmode(GPIO.BCM)
367 GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
368 if GPIO.input(pin) == GPIO.LOW:
369 self.trigger_estop(
370 f'Hardware E-stop pin {pin} triggered (LOW)',
371 source=f'gpio_{pin}',
372 )
373 return
374 except Exception:
375 pass
376 except ImportError:
377 pass # No GPIO library available
379 def _check_serial_estops(self):
380 """Check registered serial ports for E-stop trigger pattern."""
381 if not self._estop_serial_ports:
382 return
384 try:
385 import serial
386 except ImportError:
387 return
389 for port_config in self._estop_serial_ports:
390 try:
391 ser = serial.Serial(
392 port_config['port'], baudrate=9600, timeout=0.01,
393 )
394 data = ser.read(256)
395 ser.close()
396 if data:
397 text = data.decode('utf-8', errors='ignore')
398 if port_config['compiled'].search(text):
399 self.trigger_estop(
400 f"Serial E-stop pattern '{port_config['pattern']}' "
401 f"received on {port_config['port']}",
402 source=f"serial_{port_config['port']}",
403 )
404 return
405 except Exception:
406 pass
408 def _gossip_estop(self, reason: str, source: str):
409 """Best-effort gossip notification of E-stop event."""
410 try:
411 from integrations.social.peer_discovery import gossip
412 gossip.broadcast({
413 'type': 'node_estop',
414 'reason': reason,
415 'source': source,
416 'timestamp': time.time(),
417 })
418 except Exception:
419 pass # Gossip failure must not block safety