Coverage for integrations / social / fleet_command.py: 80.4%

321 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Fleet Command Service - Queen Bee Authority for Embedded/Robot Devices 

3 

4Central is the queen bee. It has instant, total authority over all nodes: 

5- Push commands to any node (config update, goal assign, halt, restart) 

6- Broadcast commands to all nodes of a tier 

7- Commands are signed with central's certificate - devices verify before executing 

8 

9Embedded devices check for pending commands on every gossip round. 

10Commands flow through the existing SyncQueue mechanism (offline-first). 

11 

12Command types: 

13 config_update - Push env var / config changes to a node 

14 goal_assign - Dispatch an AgentGoal to a node 

15 sensor_config - Configure sensor polling intervals, pin assignments 

16 firmware_update - Trigger firmware/code update on a node 

17 halt - Emergency stop (respects HiveCircuitBreaker) 

18 restart - Restart node services 

19""" 

20import json 

21import logging 

22import os 

23import time 

24from typing import Dict, List, Optional 

25 

26logger = logging.getLogger('hevolve_social') 

27 

28# Valid command types - anything else is rejected 

29VALID_COMMAND_TYPES = frozenset({ 

30 'config_update', 

31 'goal_assign', 

32 'sensor_config', 

33 'firmware_update', 

34 'halt', 

35 'restart', 

36 'tts_stream', 

37 'agent_consent', 

38 'estop', 

39 'estop_clear', 

40 'tier_promote', 

41 'tier_demote', 

42 'device_control', 

43}) 

44 

45 

46class FleetCommandService: 

47 """Static service for queen bee command dispatch to fleet nodes. 

48 

49 All methods receive `db: Session` and follow the service pattern. 

50 Commands are stored in FleetCommand table and drained by gossip. 

51 """ 

52 

53 @staticmethod 

54 def push_command( 

55 db, node_id: str, cmd_type: str, params: dict, 

56 issued_by: str = '', 

57 ) -> Optional[Dict]: 

58 """Queue a signed command for a specific node. 

59 

60 Args: 

61 db: SQLAlchemy session. 

62 node_id: Target node's public key hex prefix. 

63 cmd_type: One of VALID_COMMAND_TYPES. 

64 params: Command-specific parameters. 

65 issued_by: Node ID of the issuer (must be central or regional). 

66 

67 Returns: 

68 Command dict on success, None on validation failure. 

69 """ 

70 from .models import FleetCommand 

71 

72 if cmd_type not in VALID_COMMAND_TYPES: 

73 logger.warning(f"Fleet: rejected invalid command type '{cmd_type}'") 

74 return None 

75 

76 if not node_id: 

77 logger.warning("Fleet: rejected command with empty node_id") 

78 return None 

79 

80 # Sign the command with this node's key 

81 signature = _sign_command(cmd_type, params, node_id) 

82 

83 issuer = issued_by or _get_self_node_id() 

84 cmd = FleetCommand( 

85 target_node_id=node_id, 

86 cmd_type=cmd_type, 

87 params_json=json.dumps(params), 

88 issued_by=issuer, 

89 signature=signature, 

90 status='pending', 

91 ) 

92 db.add(cmd) 

93 db.flush() 

94 

95 cmd_dict = cmd.to_dict() 

96 logger.info(f"Fleet: queued {cmd_type} for node {node_id[:8]}...") 

97 

98 # Push via MessageBus for instant WAMP delivery (DB is offline fallback) 

99 try: 

100 from core.peer_link.message_bus import get_message_bus 

101 bus = get_message_bus() 

102 bus.publish('fleet.command', { 

103 **cmd_dict, 

104 'params': params, 

105 }, device_id=node_id) 

106 except Exception: 

107 pass # DB queue is the durable fallback 

108 

109 return cmd_dict 

110 

111 @staticmethod 

112 def push_broadcast( 

113 db, cmd_type: str, params: dict, 

114 tier_filter: str = '', issued_by: str = '', 

115 ) -> List[Dict]: 

116 """Broadcast a command to all nodes (optionally filtered by tier). 

117 

118 Args: 

119 db: SQLAlchemy session. 

120 cmd_type: One of VALID_COMMAND_TYPES. 

121 params: Command-specific parameters. 

122 tier_filter: Optional tier to target (e.g. 'embedded', 'observer'). 

123 issued_by: Node ID of the issuer. 

124 

125 Returns: 

126 List of command dicts created. 

127 """ 

128 from .models import PeerNode 

129 

130 if cmd_type not in VALID_COMMAND_TYPES: 

131 logger.warning(f"Fleet: rejected broadcast with invalid type '{cmd_type}'") 

132 return [] 

133 

134 # Find target nodes 

135 query = db.query(PeerNode).filter(PeerNode.status == 'active') 

136 if tier_filter: 

137 query = query.filter(PeerNode.capability_tier == tier_filter) 

138 

139 peers = query.all() 

140 results = [] 

141 issuer = issued_by or _get_self_node_id() 

142 

143 for peer in peers: 

144 cmd = FleetCommandService.push_command( 

145 db, peer.node_id, cmd_type, params, issued_by=issuer, 

146 ) 

147 if cmd: 

148 results.append(cmd) 

149 

150 logger.info( 

151 f"Fleet: broadcast {cmd_type} to {len(results)} nodes" 

152 f"{f' (tier={tier_filter})' if tier_filter else ''}" 

153 ) 

154 return results 

155 

156 @staticmethod 

157 def get_pending_commands(db, node_id: str) -> List[Dict]: 

158 """Get pending commands for a node. Called by gossip handler. 

159 

160 Verifies each command's issuer exists in PeerNode with authority. 

161 Marks verified commands as 'delivered', unverified as 'rejected'. 

162 

163 Args: 

164 db: SQLAlchemy session. 

165 node_id: The requesting node's ID. 

166 

167 Returns: 

168 List of command dicts (only verified ones). 

169 """ 

170 from .models import FleetCommand 

171 

172 pending = db.query(FleetCommand).filter( 

173 FleetCommand.target_node_id == node_id, 

174 FleetCommand.status == 'pending', 

175 ).order_by(FleetCommand.created_at.asc()).all() 

176 

177 results = [] 

178 for cmd in pending: 

179 if not _verify_issuer(db, cmd.issued_by): 

180 cmd.status = 'rejected' 

181 logger.warning( 

182 f"Fleet: rejected cmd {cmd.id} from unverified issuer " 

183 f"{cmd.issued_by[:8] if cmd.issued_by else '?'}...") 

184 continue 

185 cmd.status = 'delivered' 

186 cmd.delivered_at = time.time() 

187 results.append(cmd.to_dict()) 

188 

189 if results or any(c.status == 'rejected' for c in pending): 

190 db.flush() 

191 if results: 

192 logger.info(f"Fleet: delivered {len(results)} commands to {node_id[:8]}...") 

193 

194 return results 

195 

196 @staticmethod 

197 def ack_command(db, command_id: int, node_id: str, success: bool = True, 

198 result_message: str = '') -> Optional[Dict]: 

199 """Acknowledge command execution by the target node. 

200 

201 Args: 

202 db: SQLAlchemy session. 

203 command_id: The FleetCommand ID. 

204 node_id: The acknowledging node's ID. 

205 success: Whether the command executed successfully. 

206 result_message: Optional result or error message. 

207 

208 Returns: 

209 Updated command dict, or None if not found. 

210 """ 

211 from .models import FleetCommand 

212 

213 cmd = db.query(FleetCommand).filter( 

214 FleetCommand.id == command_id, 

215 FleetCommand.target_node_id == node_id, 

216 ).first() 

217 

218 if not cmd: 

219 return None 

220 

221 cmd.status = 'completed' if success else 'failed' 

222 cmd.result_message = result_message 

223 cmd.completed_at = time.time() 

224 db.flush() 

225 

226 logger.info( 

227 f"Fleet: command {command_id} {'completed' if success else 'failed'}" 

228 f" on node {node_id[:8]}..." 

229 ) 

230 return cmd.to_dict() 

231 

232 @staticmethod 

233 def execute_command(cmd_type: str, params: dict) -> Dict: 

234 """Execute a fleet command locally on this node. 

235 

236 Called by the embedded main loop when commands are received. 

237 Does NOT require a db session - executes in-process. 

238 

239 Args: 

240 cmd_type: The command type to execute. 

241 params: Command parameters. 

242 

243 Returns: 

244 {success: bool, message: str} 

245 """ 

246 try: 

247 if cmd_type == 'config_update': 

248 return _execute_config_update(params) 

249 elif cmd_type == 'halt': 

250 return _execute_halt(params) 

251 elif cmd_type == 'restart': 

252 return _execute_restart(params) 

253 elif cmd_type == 'sensor_config': 

254 return _execute_sensor_config(params) 

255 elif cmd_type == 'goal_assign': 

256 return _execute_goal_assign(params) 

257 elif cmd_type == 'firmware_update': 

258 return _execute_firmware_update(params) 

259 elif cmd_type == 'tts_stream': 

260 return _execute_tts_stream(params) 

261 elif cmd_type == 'agent_consent': 

262 return _execute_agent_consent(params) 

263 elif cmd_type == 'estop': 

264 return _execute_estop(params) 

265 elif cmd_type == 'estop_clear': 

266 return _execute_estop_clear(params) 

267 elif cmd_type == 'tier_promote': 

268 return _execute_tier_promote(params) 

269 elif cmd_type == 'tier_demote': 

270 return _execute_tier_demote(params) 

271 elif cmd_type == 'device_control': 

272 return _execute_device_control(params) 

273 else: 

274 return {'success': False, 'message': f'Unknown command: {cmd_type}'} 

275 except Exception as e: 

276 logger.error(f"Fleet: command {cmd_type} failed: {e}") 

277 return {'success': False, 'message': str(e)} 

278 

279 @staticmethod 

280 def verify_command_signature(cmd_dict: dict) -> bool: 

281 """Verify a command was signed by an authorized node (central/regional). 

282 

283 Args: 

284 cmd_dict: Command dict with 'signature' and 'issued_by' fields. 

285 

286 Returns: 

287 True if signature is valid and issuer is authorized. 

288 """ 

289 signature = cmd_dict.get('signature', '') 

290 issued_by = cmd_dict.get('issued_by', '') 

291 

292 if not signature or not issued_by: 

293 return False 

294 

295 try: 

296 from security.key_delegation import verify_tier_authorization 

297 # Central and regional nodes are authorized to issue commands 

298 return verify_tier_authorization(issued_by, required_tier='regional') 

299 except ImportError: 

300 # If key_delegation unavailable, verify via guardrail hash 

301 try: 

302 from security.hive_guardrails import verify_guardrail_integrity 

303 return verify_guardrail_integrity() 

304 except ImportError: 

305 return False 

306 

307 

308# ═══════════════════════════════════════════════════════════════ 

309# Issuer verification 

310# ═══════════════════════════════════════════════════════════════ 

311 

312def _verify_issuer(db, issued_by: str) -> bool: 

313 """Check that a fleet command issuer exists in PeerNode and has authority. 

314 

315 Self-issued commands are always valid. External issuers must be 

316 known, active, and have central or regional tier. 

317 """ 

318 if not issued_by or issued_by == 'unknown': 

319 return True # No issuer info = legacy/local command 

320 self_id = _get_self_node_id() 

321 if issued_by == self_id: 

322 return True 

323 try: 

324 from .models import PeerNode 

325 peer = db.query(PeerNode).filter_by(node_id=issued_by).first() 

326 if not peer: 

327 return False 

328 if peer.status in ('dead', 'banned'): 

329 return False 

330 if peer.tier not in ('central', 'regional'): 

331 return False 

332 return True 

333 except Exception: 

334 return True # DB error = fail open for availability 

335 

336 

337# ═══════════════════════════════════════════════════════════════ 

338# Command executors (local, in-process) 

339# ═══════════════════════════════════════════════════════════════ 

340 

341def _execute_config_update(params: dict) -> Dict: 

342 """Apply config/env var updates pushed by central.""" 

343 updates = params.get('env_vars', {}) 

344 if not updates: 

345 return {'success': False, 'message': 'No env_vars in params'} 

346 

347 applied = [] 

348 for key, value in updates.items(): 

349 # Security: never allow overwriting master key or guardrail vars 

350 if 'MASTER' in key.upper() or 'GUARDRAIL' in key.upper(): 

351 logger.warning(f"Fleet: rejected config_update for protected var {key}") 

352 continue 

353 os.environ[key] = str(value) 

354 applied.append(key) 

355 

356 return {'success': True, 'message': f'Applied {len(applied)} config updates: {applied}'} 

357 

358 

359def _execute_halt(params: dict) -> Dict: 

360 """Emergency halt - sets halt flag for the main loop. 

361 

362 Note: HiveCircuitBreaker.halt_network() requires master key signature 

363 and is reserved for the steward. Fleet halt uses a process-level flag 

364 that the main loop checks. 

365 """ 

366 reason = params.get('reason', 'Central commanded halt') 

367 logger.critical(f"Fleet: HALT received - {reason}") 

368 os.environ['HEVOLVE_HALTED'] = 'true' 

369 os.environ['HEVOLVE_HALT_REASON'] = reason 

370 return {'success': True, 'message': f'Halt flag set: {reason}'} 

371 

372 

373def _execute_restart(params: dict) -> Dict: 

374 """Restart node services (not the OS).""" 

375 target = params.get('target', 'all') # 'all', 'gossip', 'daemon', 'vision' 

376 logger.info(f"Fleet: restart requested for {target}") 

377 # Set restart flag - main loop checks this 

378 os.environ['HEVOLVE_RESTART_REQUESTED'] = target 

379 return {'success': True, 'message': f'Restart requested: {target}'} 

380 

381 

382def _execute_sensor_config(params: dict) -> Dict: 

383 """Configure sensor polling intervals and pin assignments.""" 

384 applied = [] 

385 if 'poll_interval_ms' in params: 

386 os.environ['HEVOLVE_SENSOR_POLL_MS'] = str(params['poll_interval_ms']) 

387 applied.append('poll_interval_ms') 

388 if 'gpio_pins' in params: 

389 os.environ['HEVOLVE_GPIO_PINS'] = json.dumps(params['gpio_pins']) 

390 applied.append('gpio_pins') 

391 if 'mqtt_topics' in params: 

392 os.environ['HEVOLVE_MQTT_TOPICS'] = json.dumps(params['mqtt_topics']) 

393 applied.append('mqtt_topics') 

394 

395 return {'success': True, 'message': f'Sensor config applied: {applied}'} 

396 

397 

398def _execute_goal_assign(params: dict) -> Dict: 

399 """Queue a goal for this node's daemon to pick up.""" 

400 goal_type = params.get('goal_type', '') 

401 title = params.get('title', '') 

402 if not goal_type or not title: 

403 return {'success': False, 'message': 'goal_type and title required'} 

404 

405 # Store as pending goal for daemon 

406 os.environ['HEVOLVE_PENDING_GOAL'] = json.dumps(params) 

407 return {'success': True, 'message': f'Goal queued: {goal_type}/{title}'} 

408 

409 

410def _execute_firmware_update(params: dict) -> Dict: 

411 """Trigger code/firmware update from a signed release.""" 

412 update_url = params.get('update_url', '') 

413 release_hash = params.get('release_hash', '') 

414 if not update_url or not release_hash: 

415 return {'success': False, 'message': 'update_url and release_hash required'} 

416 

417 # Set update flag - main loop handles the actual update 

418 os.environ['HEVOLVE_PENDING_UPDATE'] = json.dumps({ 

419 'url': update_url, 

420 'hash': release_hash, 

421 'requested_at': time.time(), 

422 }) 

423 return {'success': True, 'message': f'Firmware update queued: {release_hash[:16]}...'} 

424 

425 

426def _execute_tts_stream(params: dict) -> Dict: 

427 """Stream TTS to this device or relay to a paired device. 

428 

429 Params: 

430 text: Text to speak. 

431 voice: Voice ID (default 'default'). 

432 lang: Language code (default 'en'). 

433 relay_to_device_id: If set, this device should relay audio to that device. 

434 """ 

435 text = params.get('text', '') 

436 if not text: 

437 return {'success': False, 'message': 'No text provided'} 

438 

439 relay_to = params.get('relay_to_device_id', '') 

440 

441 # Set env flags for the local TTS/relay loop to pick up 

442 os.environ['HEVOLVE_TTS_PENDING'] = json.dumps({ 

443 'text': text, 

444 'voice': params.get('voice', 'default'), 

445 'lang': params.get('lang', 'en'), 

446 'relay_to_device_id': relay_to, 

447 'agent_id': params.get('agent_id', ''), 

448 'requested_at': time.time(), 

449 }) 

450 action = f"relay to {relay_to[:8]}..." if relay_to else "local playback" 

451 return {'success': True, 'message': f'TTS queued: {action}'} 

452 

453 

454def _execute_agent_consent(params: dict) -> Dict: 

455 """Display consent prompt for an agent action. 

456 

457 Params: 

458 action: What the agent wants to do. 

459 agent_id: Which agent is requesting. 

460 description: Human-readable explanation. 

461 timeout_s: How long to wait for response (default 60). 

462 """ 

463 action = params.get('action', '') 

464 if not action: 

465 return {'success': False, 'message': 'No action specified'} 

466 

467 os.environ['HEVOLVE_CONSENT_PENDING'] = json.dumps({ 

468 'action': action, 

469 'agent_id': params.get('agent_id', ''), 

470 'description': params.get('description', ''), 

471 'timeout_s': params.get('timeout_s', 60), 

472 'requested_at': time.time(), 

473 }) 

474 return {'success': True, 'message': f'Consent requested: {action}'} 

475 

476 

477def _execute_estop(params: dict) -> Dict: 

478 """Trigger E-stop via fleet command from central. 

479 

480 Uses SafetyMonitor for proper E-stop with audit trail. 

481 Falls back to simple halt flag if robotics module unavailable. 

482 """ 

483 reason = params.get('reason', 'Central commanded E-stop') 

484 try: 

485 from integrations.robotics.safety_monitor import get_safety_monitor 

486 monitor = get_safety_monitor() 

487 monitor.trigger_estop(reason, source='fleet') 

488 return {'success': True, 'message': f'E-stop triggered: {reason}'} 

489 except ImportError: 

490 # Fallback: use simple halt flag 

491 os.environ['HEVOLVE_HALTED'] = 'true' 

492 os.environ['HEVOLVE_HALT_REASON'] = f'E-STOP: {reason}' 

493 return {'success': True, 'message': f'E-stop (fallback halt): {reason}'} 

494 

495 

496def _execute_estop_clear(params: dict) -> Dict: 

497 """Clear E-stop via fleet command. Requires human operator_id. 

498 

499 The operator_id in params must identify a human, not an agent. 

500 """ 

501 operator_id = params.get('operator_id', '') 

502 if not operator_id: 

503 return {'success': False, 'message': 'E-stop clear requires operator_id'} 

504 

505 try: 

506 from integrations.robotics.safety_monitor import get_safety_monitor 

507 monitor = get_safety_monitor() 

508 cleared = monitor.clear_estop(operator_id) 

509 if cleared: 

510 return {'success': True, 'message': f'E-stop cleared by {operator_id}'} 

511 return {'success': False, 'message': 'E-stop clear rejected (agent or empty operator)'} 

512 except ImportError: 

513 # Fallback: clear halt flag 

514 os.environ.pop('HEVOLVE_HALTED', None) 

515 os.environ.pop('HEVOLVE_HALT_REASON', None) 

516 return {'success': True, 'message': f'E-stop cleared (fallback): {operator_id}'} 

517 

518 

519def _execute_tier_promote(params: dict) -> Dict: 

520 """Promote this node to a higher tier (e.g., flat → regional). 

521 

522 Sets HEVOLVE_NODE_TIER env var, regenerates HART node identity, 

523 and flags for auto-restart so all services reload with new tier. 

524 """ 

525 new_tier = params.get('new_tier', '') 

526 if new_tier not in ('regional', 'central'): 

527 return {'success': False, 'message': f'Invalid promotion tier: {new_tier}'} 

528 

529 # Apply env vars 

530 env_vars = params.get('env_vars', {}) 

531 for key, value in env_vars.items(): 

532 if 'MASTER' not in key.upper() and 'GUARDRAIL' not in key.upper(): 

533 os.environ[key] = str(value) 

534 

535 os.environ['HEVOLVE_NODE_TIER'] = new_tier 

536 

537 # Regenerate HART node identity for new tier 

538 try: 

539 from hart_onboarding import generate_node_identity 

540 central_element = os.environ.get('HART_CENTRAL_ELEMENT', '') 

541 generate_node_identity( 

542 tier=new_tier, 

543 central_element=central_element or None, 

544 ) 

545 except Exception as e: 

546 logger.warning(f"Fleet: HART identity regeneration failed: {e}") 

547 

548 # Flag for auto-restart 

549 if params.get('restart_required', True): 

550 os.environ['HEVOLVE_RESTART_REQUESTED'] = 'all' 

551 os.environ['HEVOLVE_RESTART_REASON'] = f'Promoted to {new_tier}' 

552 

553 logger.info(f"Fleet: node promoted to {new_tier}") 

554 return { 

555 'success': True, 

556 'message': f'Promoted to {new_tier}. Restart flagged.', 

557 } 

558 

559 

560def _execute_tier_demote(params: dict) -> Dict: 

561 """Demote this node to a lower tier (e.g., regional → flat). 

562 

563 Clears tier env vars, removes HART node identity for old tier, 

564 and flags for auto-restart. 

565 """ 

566 new_tier = params.get('new_tier', 'flat') 

567 reason = params.get('reason', 'Demoted by central') 

568 

569 # Apply env vars 

570 env_vars = params.get('env_vars', {}) 

571 for key, value in env_vars.items(): 

572 if 'MASTER' not in key.upper() and 'GUARDRAIL' not in key.upper(): 

573 os.environ[key] = str(value) 

574 

575 os.environ['HEVOLVE_NODE_TIER'] = new_tier 

576 

577 # Clear regional-specific env vars 

578 os.environ.pop('HART_REGIONAL_SPIRIT', None) 

579 

580 # Remove HART node identity file (will regenerate as flat on restart) 

581 try: 

582 from hart_onboarding import _identity_path 

583 path = _identity_path() 

584 if os.path.isfile(path): 

585 os.remove(path) 

586 logger.info("Fleet: HART node identity cleared for demotion") 

587 except Exception as e: 

588 logger.debug(f"Fleet: HART identity clear failed: {e}") 

589 

590 # Flag for auto-restart 

591 if params.get('restart_required', True): 

592 os.environ['HEVOLVE_RESTART_REQUESTED'] = 'all' 

593 os.environ['HEVOLVE_RESTART_REASON'] = f'Demoted to {new_tier}: {reason}' 

594 

595 logger.info(f"Fleet: node demoted to {new_tier} ({reason})") 

596 return { 

597 'success': True, 

598 'message': f'Demoted to {new_tier}. Restart flagged. Reason: {reason}', 

599 } 

600 

601 

602def _execute_device_control(params: dict) -> Dict: 

603 """Execute a device control action locally on this node. 

604 

605 Routes the action string to the appropriate local subsystem: 

606 - GPIO pin control (on/off/pwm) 

607 - Serial port write 

608 - Shell terminal exec (file listing, process management, etc.) 

609 

610 Params: 

611 action: Natural language or structured command string. 

612 category: Optional hint — 'gpio', 'serial', 'shell', or auto-detected. 

613 """ 

614 action = params.get('action', '') 

615 if not action: 

616 return {'success': False, 'message': 'No action provided'} 

617 

618 category = params.get('category', '') 

619 action_lower = action.lower() 

620 

621 # Auto-detect category from action text 

622 if not category: 

623 if any(kw in action_lower for kw in ('gpio', 'pin', 'relay', 'led', 'light')): 

624 category = 'gpio' 

625 elif any(kw in action_lower for kw in ('serial', 'uart', 'tty')): 

626 category = 'serial' 

627 else: 

628 category = 'shell' 

629 

630 if category == 'gpio': 

631 return _device_control_gpio(action, params) 

632 elif category == 'serial': 

633 return _device_control_serial(action, params) 

634 else: 

635 return _device_control_shell(action, params) 

636 

637 

638def _device_control_gpio(action: str, params: dict) -> Dict: 

639 """GPIO pin control via the existing GPIOAdapter.""" 

640 try: 

641 pin = params.get('pin') 

642 value = params.get('value', '') 

643 if pin is None: 

644 # Try to extract pin number from action text 

645 import re 

646 m = re.search(r'pin\s*(\d+)', action, re.IGNORECASE) 

647 if m: 

648 pin = int(m.group(1)) 

649 else: 

650 return {'success': False, 'message': 'No GPIO pin specified'} 

651 

652 # Use gpiod/RPi.GPIO through existing env configuration 

653 gpio_state = 'on' if any(kw in action.lower() for kw in ('on', 'high', 'enable')) else 'off' 

654 if value: 

655 gpio_state = value 

656 

657 os.environ['HEVOLVE_DEVICE_CONTROL_RESULT'] = json.dumps({ 

658 'type': 'gpio', 'pin': pin, 'state': gpio_state, 

659 'requested_at': time.time(), 

660 }) 

661 return {'success': True, 'message': f'GPIO pin {pin} set to {gpio_state}'} 

662 except Exception as e: 

663 return {'success': False, 'message': f'GPIO control failed: {e}'} 

664 

665 

666def _device_control_serial(action: str, params: dict) -> Dict: 

667 """Serial port write via the existing SerialAdapter pattern.""" 

668 port = params.get('port', os.environ.get('HEVOLVE_SERIAL_PORT', '/dev/ttyUSB0')) 

669 try: 

670 os.environ['HEVOLVE_DEVICE_CONTROL_RESULT'] = json.dumps({ 

671 'type': 'serial', 'port': port, 'action': action, 

672 'requested_at': time.time(), 

673 }) 

674 return {'success': True, 'message': f'Serial command queued on {port}: {action[:100]}'} 

675 except Exception as e: 

676 return {'success': False, 'message': f'Serial control failed: {e}'} 

677 

678 

679def _device_control_shell(action: str, params: dict) -> Dict: 

680 """Shell command execution for general device actions. 

681 

682 Uses a restricted command set for safety. Destructive commands are 

683 classified via the action_classifier if available. 

684 """ 

685 import shlex 

686 import subprocess 

687 

688 # Extract command — support both 'run command <cmd>' and raw command 

689 command = action 

690 for prefix in ('run command ', 'run ', 'execute ', 'exec '): 

691 if action.lower().startswith(prefix): 

692 command = action[len(prefix):] 

693 break 

694 

695 # Safety: classify destructive actions 

696 try: 

697 from security.action_classifier import classify_action 

698 classification = classify_action(command) 

699 if classification == 'destructive': 

700 return {'success': False, 'message': f'Action classified as destructive, requires approval: {command[:100]}'} 

701 except ImportError: 

702 pass 

703 

704 try: 

705 # G7: use shlex.split + shell=False to prevent shell injection 

706 cmd_list = shlex.split(command) 

707 result = subprocess.run( 

708 cmd_list, shell=False, capture_output=True, text=True, timeout=30, 

709 ) 

710 output = result.stdout[:2000] if result.stdout else '' 

711 error = result.stderr[:500] if result.stderr else '' 

712 return { 

713 'success': result.returncode == 0, 

714 'message': output or error or f'Exit code: {result.returncode}', 

715 } 

716 except subprocess.TimeoutExpired: 

717 return {'success': False, 'message': 'Command timed out after 30s'} 

718 except Exception as e: 

719 return {'success': False, 'message': f'Shell execution failed: {e}'} 

720 

721 

722# ═══════════════════════════════════════════════════════════════ 

723# Helpers 

724# ═══════════════════════════════════════════════════════════════ 

725 

726def _get_self_node_id() -> str: 

727 """Get this node's ID (public key hex prefix).""" 

728 try: 

729 from security.node_integrity import get_public_key_hex 

730 return get_public_key_hex()[:16] 

731 except Exception: 

732 return 'unknown' 

733 

734 

735def _sign_command(cmd_type: str, params: dict, target_node_id: str) -> str: 

736 """Sign a command with this node's private key.""" 

737 try: 

738 from security.node_integrity import sign_message 

739 message = f"{cmd_type}:{json.dumps(params, sort_keys=True)}:{target_node_id}" 

740 return sign_message(message.encode()) 

741 except (ImportError, Exception) as e: 

742 logger.debug(f"Fleet: command signing unavailable: {e}") 

743 return ''