Coverage for integrations / robotics / safety_monitor.py: 86.1%

180 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Safety Monitor — E-Stop + Operational Domain Enforcement 

3 

4Safety first. A robot that moves but has no emergency stop is dangerous. 

5A GPIO pin going LOW must halt all motor output within 50ms, not wait 

6for a gossip round. 

7 

8Design principles: 

9 1. Safety is local-first. E-stop does NOT require network, master key, 

10 or agent approval. 

11 2. Agents cannot clear E-stop. Only human operators can. 

12 3. Every motor command passes through workspace limit checks. 

13 4. Fleet notification (gossip) is informational, not a gate. 

14 

15Usage: 

16 from integrations.robotics.safety_monitor import SafetyMonitor 

17 monitor = SafetyMonitor() 

18 monitor.register_estop_pin(17) # GPIO 17 as hardware E-stop 

19 monitor.register_workspace_limits({'x': (-1.0, 1.0), 'y': (-0.5, 0.5)}) 

20 monitor.start() 

21""" 

22import logging 

23import os 

24import re 

25import threading 

26import time 

27from typing import Callable, Dict, List, Optional, Tuple 

28 

29logger = logging.getLogger('hevolve_robotics') 

30 

31# Singleton instance 

32_monitor = None 

33_monitor_lock = threading.Lock() 

34 

35 

36def get_safety_monitor() -> 'SafetyMonitor': 

37 """Get or create the singleton SafetyMonitor.""" 

38 global _monitor 

39 if _monitor is None: 

40 with _monitor_lock: 

41 if _monitor is None: 

42 _monitor = SafetyMonitor() 

43 return _monitor 

44 

45 

46class SafetyMonitor: 

47 """E-Stop + Operational Domain Monitor. 

48 

49 Thread-safe. Runs a dedicated monitor thread at 20Hz for E-stop 

50 polling when hardware sources are registered. 

51 """ 

52 

53 MONITOR_HZ = 20 # 50ms poll interval 

54 

55 def __init__(self): 

56 self._lock = threading.RLock() 

57 self._estop_active = False 

58 self._estop_reason = '' 

59 self._estop_source = '' 

60 self._estop_timestamp = 0.0 

61 self._cleared_by = '' 

62 

63 # Workspace limits: {axis: (min, max)} 

64 self._workspace_limits: Dict[str, Tuple[float, float]] = {} 

65 self._joint_limits: Dict[str, Tuple[float, float]] = {} 

66 

67 # Registered E-stop sources 

68 self._estop_gpio_pins: List[int] = [] 

69 self._estop_serial_ports: List[Dict] = [] # [{'port': str, 'pattern': str}] 

70 

71 # Callbacks for E-stop events 

72 self._on_estop_callbacks: List[Callable] = [] 

73 

74 # Audit trail 

75 self._audit_trail: List[Dict] = [] 

76 self._max_audit = 100 

77 

78 # Monitor thread 

79 self._monitor_thread: Optional[threading.Thread] = None 

80 self._running = False 

81 

82 def register_estop_pin(self, pin: int): 

83 """Register a GPIO pin as hardware E-stop source. 

84 

85 When the pin reads LOW (pulled to ground), E-stop triggers. 

86 """ 

87 with self._lock: 

88 if pin not in self._estop_gpio_pins: 

89 self._estop_gpio_pins.append(pin) 

90 logger.info(f"Safety: registered E-stop GPIO pin {pin}") 

91 

92 def register_estop_serial(self, port: str, trigger_pattern: str = 'ESTOP'): 

93 """Register a serial port as E-stop source. 

94 

95 When the trigger pattern is received on the serial port, E-stop triggers. 

96 """ 

97 with self._lock: 

98 self._estop_serial_ports.append({ 

99 'port': port, 

100 'pattern': trigger_pattern, 

101 'compiled': re.compile(re.escape(trigger_pattern)), 

102 }) 

103 logger.info(f"Safety: registered E-stop serial {port} (pattern={trigger_pattern})") 

104 

105 def register_workspace_limits(self, limits: Dict): 

106 """Define operational domain bounds. 

107 

108 Args: 

109 limits: Dict with axis limits and optional joint limits. 

110 { 

111 'x': (-1.0, 1.0), 

112 'y': (-0.5, 0.5), 

113 'z': (0.0, 1.2), 

114 'joint_limits': {'joint_0': (-90, 90), 'joint_1': (0, 180)} 

115 } 

116 """ 

117 with self._lock: 

118 joint_limits = limits.pop('joint_limits', {}) 

119 for axis, bounds in limits.items(): 

120 if isinstance(bounds, (list, tuple)) and len(bounds) == 2: 

121 self._workspace_limits[axis] = (float(bounds[0]), float(bounds[1])) 

122 for joint, bounds in joint_limits.items(): 

123 if isinstance(bounds, (list, tuple)) and len(bounds) == 2: 

124 self._joint_limits[joint] = (float(bounds[0]), float(bounds[1])) 

125 logger.info( 

126 f"Safety: workspace limits set — " 

127 f"axes={list(self._workspace_limits.keys())}, " 

128 f"joints={list(self._joint_limits.keys())}" 

129 ) 

130 

131 def on_estop(self, callback: Callable): 

132 """Register a callback for E-stop events. 

133 

134 Callback receives (reason: str, source: str). 

135 """ 

136 with self._lock: 

137 self._on_estop_callbacks.append(callback) 

138 

139 def check_position_safe(self, position: Dict) -> bool: 

140 """Validate a target position against workspace limits. 

141 

142 Args: 

143 position: Dict with axis values, e.g. {'x': 0.5, 'y': 0.2, 'z': 0.8} 

144 and/or joint values, e.g. {'joint_0': 45.0} 

145 

146 Returns: 

147 True if position is within all configured limits. 

148 """ 

149 if self._estop_active: 

150 return False 

151 

152 with self._lock: 

153 # Check Cartesian workspace limits 

154 for axis, (lo, hi) in self._workspace_limits.items(): 

155 val = position.get(axis) 

156 if val is not None: 

157 if float(val) < lo or float(val) > hi: 

158 logger.warning( 

159 f"Safety: position {axis}={val} outside limits [{lo}, {hi}]" 

160 ) 

161 return False 

162 

163 # Check joint limits 

164 for joint, (lo, hi) in self._joint_limits.items(): 

165 val = position.get(joint) 

166 if val is not None: 

167 if float(val) < lo or float(val) > hi: 

168 logger.warning( 

169 f"Safety: joint {joint}={val} outside limits [{lo}, {hi}]" 

170 ) 

171 return False 

172 

173 return True 

174 

175 def trigger_estop(self, reason: str, source: str = 'manual'): 

176 """Trigger emergency stop. Immediate. No network dependency. 

177 

178 Sets HEVOLVE_HALTED env var, fires callbacks, logs to audit trail. 

179 Fleet notification is informational (best-effort gossip). 

180 

181 Args: 

182 reason: Human-readable reason for the E-stop. 

183 source: Source identifier ('gpio_17', 'serial_/dev/ttyUSB0', 'manual', 'fleet'). 

184 """ 

185 with self._lock: 

186 if self._estop_active: 

187 return # Already stopped 

188 

189 self._estop_active = True 

190 self._estop_reason = reason 

191 self._estop_source = source 

192 self._estop_timestamp = time.time() 

193 self._cleared_by = '' 

194 

195 # Set halt flag immediately 

196 os.environ['HEVOLVE_HALTED'] = 'true' 

197 os.environ['HEVOLVE_HALT_REASON'] = f'E-STOP: {reason}' 

198 

199 audit_entry = { 

200 'event': 'estop_triggered', 

201 'reason': reason, 

202 'source': source, 

203 'timestamp': self._estop_timestamp, 

204 } 

205 self._audit_trail.append(audit_entry) 

206 if len(self._audit_trail) > self._max_audit: 

207 self._audit_trail = self._audit_trail[-self._max_audit:] 

208 

209 logger.critical(f"E-STOP TRIGGERED: {reason} (source={source})") 

210 

211 # Fire callbacks (outside lock) 

212 for cb in self._on_estop_callbacks: 

213 try: 

214 cb(reason, source) 

215 except Exception as e: 

216 logger.error(f"Safety callback error: {e}") 

217 

218 # Local halt via HiveCircuitBreaker (no master key needed) 

219 try: 

220 from security.hive_guardrails import HiveCircuitBreaker 

221 HiveCircuitBreaker.local_halt(f'E-STOP: {reason}') 

222 except (ImportError, AttributeError): 

223 pass 

224 

225 # Best-effort gossip notification (informational) 

226 self._gossip_estop(reason, source) 

227 

228 def clear_estop(self, operator_id: str) -> bool: 

229 """Clear E-stop. HUMAN ONLY. Agents cannot clear. 

230 

231 Args: 

232 operator_id: Human operator identifier. Must not be empty 

233 and must not start with 'agent_' or 'bot_'. 

234 

235 Returns: 

236 True if E-stop was cleared, False if rejected. 

237 """ 

238 if not operator_id: 

239 logger.warning("Safety: E-stop clear rejected — no operator_id") 

240 return False 

241 

242 # Reject agent-initiated clears 

243 lower_id = operator_id.lower() 

244 if lower_id.startswith(('agent_', 'bot_', 'system_', 'auto_')): 

245 logger.warning( 

246 f"Safety: E-stop clear rejected — " 

247 f"operator '{operator_id}' appears to be an agent, not a human" 

248 ) 

249 return False 

250 

251 with self._lock: 

252 if not self._estop_active: 

253 return True # Already clear 

254 

255 self._estop_active = False 

256 self._cleared_by = operator_id 

257 

258 os.environ.pop('HEVOLVE_HALTED', None) 

259 os.environ.pop('HEVOLVE_HALT_REASON', None) 

260 

261 audit_entry = { 

262 'event': 'estop_cleared', 

263 'operator_id': operator_id, 

264 'previous_reason': self._estop_reason, 

265 'timestamp': time.time(), 

266 } 

267 self._audit_trail.append(audit_entry) 

268 if len(self._audit_trail) > self._max_audit: 

269 self._audit_trail = self._audit_trail[-self._max_audit:] 

270 

271 logger.info(f"E-STOP CLEARED by operator: {operator_id}") 

272 return True 

273 

274 @property 

275 def is_estopped(self) -> bool: 

276 """Check if E-stop is currently active.""" 

277 return self._estop_active 

278 

279 def get_safety_status(self) -> Dict: 

280 """Get current safety status for dashboard/fleet reporting.""" 

281 with self._lock: 

282 return { 

283 'estop_active': self._estop_active, 

284 'estop_reason': self._estop_reason, 

285 'estop_source': self._estop_source, 

286 'estop_timestamp': self._estop_timestamp, 

287 'cleared_by': self._cleared_by, 

288 'workspace_limits': dict(self._workspace_limits), 

289 'joint_limits': dict(self._joint_limits), 

290 'estop_gpio_pins': list(self._estop_gpio_pins), 

291 'estop_serial_ports': [ 

292 {'port': s['port'], 'pattern': s['pattern']} 

293 for s in self._estop_serial_ports 

294 ], 

295 'audit_trail': list(self._audit_trail[-10:]), 

296 'monitor_running': self._running, 

297 } 

298 

299 def start(self): 

300 """Start the E-stop monitor thread (20Hz). 

301 

302 Only starts if GPIO or serial E-stop sources are registered. 

303 """ 

304 with self._lock: 

305 if self._running: 

306 return 

307 if not self._estop_gpio_pins and not self._estop_serial_ports: 

308 logger.info("Safety: no E-stop sources registered, monitor not started") 

309 return 

310 self._running = True 

311 

312 self._monitor_thread = threading.Thread( 

313 target=self._monitor_loop, 

314 name='safety_estop_monitor', 

315 daemon=True, 

316 ) 

317 self._monitor_thread.start() 

318 logger.info(f"Safety: E-stop monitor started at {self.MONITOR_HZ}Hz") 

319 

320 def stop(self): 

321 """Stop the E-stop monitor thread.""" 

322 self._running = False 

323 if self._monitor_thread and self._monitor_thread.is_alive(): 

324 self._monitor_thread.join(timeout=1.0) 

325 logger.info("Safety: E-stop monitor stopped") 

326 

327 def _monitor_loop(self): 

328 """Poll GPIO and serial E-stop sources at MONITOR_HZ.""" 

329 interval = 1.0 / self.MONITOR_HZ 

330 while self._running: 

331 try: 

332 self._check_gpio_estops() 

333 self._check_serial_estops() 

334 except Exception as e: 

335 logger.error(f"Safety monitor error: {e}") 

336 time.sleep(interval) 

337 

338 def _check_gpio_estops(self): 

339 """Check registered GPIO pins for E-stop trigger (LOW = triggered).""" 

340 if not self._estop_gpio_pins: 

341 return 

342 

343 try: 

344 import gpiod 

345 chip = gpiod.Chip('gpiochip0') 

346 for pin in self._estop_gpio_pins: 

347 try: 

348 line = chip.get_line(pin) 

349 line.request(consumer='hart_estop', type=gpiod.LINE_REQ_DIR_IN) 

350 value = line.get_value() 

351 line.release() 

352 if value == 0: # LOW = E-stop triggered 

353 self.trigger_estop( 

354 f'Hardware E-stop pin {pin} triggered (LOW)', 

355 source=f'gpio_{pin}', 

356 ) 

357 return 

358 except Exception: 

359 pass 

360 except ImportError: 

361 # Try RPi.GPIO fallback 

362 try: 

363 import RPi.GPIO as GPIO 

364 for pin in self._estop_gpio_pins: 

365 try: 

366 GPIO.setmode(GPIO.BCM) 

367 GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) 

368 if GPIO.input(pin) == GPIO.LOW: 

369 self.trigger_estop( 

370 f'Hardware E-stop pin {pin} triggered (LOW)', 

371 source=f'gpio_{pin}', 

372 ) 

373 return 

374 except Exception: 

375 pass 

376 except ImportError: 

377 pass # No GPIO library available 

378 

379 def _check_serial_estops(self): 

380 """Check registered serial ports for E-stop trigger pattern.""" 

381 if not self._estop_serial_ports: 

382 return 

383 

384 try: 

385 import serial 

386 except ImportError: 

387 return 

388 

389 for port_config in self._estop_serial_ports: 

390 try: 

391 ser = serial.Serial( 

392 port_config['port'], baudrate=9600, timeout=0.01, 

393 ) 

394 data = ser.read(256) 

395 ser.close() 

396 if data: 

397 text = data.decode('utf-8', errors='ignore') 

398 if port_config['compiled'].search(text): 

399 self.trigger_estop( 

400 f"Serial E-stop pattern '{port_config['pattern']}' " 

401 f"received on {port_config['port']}", 

402 source=f"serial_{port_config['port']}", 

403 ) 

404 return 

405 except Exception: 

406 pass 

407 

408 def _gossip_estop(self, reason: str, source: str): 

409 """Best-effort gossip notification of E-stop event.""" 

410 try: 

411 from integrations.social.peer_discovery import gossip 

412 gossip.broadcast({ 

413 'type': 'node_estop', 

414 'reason': reason, 

415 'source': source, 

416 'timestamp': time.time(), 

417 }) 

418 except Exception: 

419 pass # Gossip failure must not block safety