Coverage for integrations / social / fleet_command.py: 80.4%
321 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Fleet Command Service - Queen Bee Authority for Embedded/Robot Devices
4Central is the queen bee. It has instant, total authority over all nodes:
5- Push commands to any node (config update, goal assign, halt, restart)
6- Broadcast commands to all nodes of a tier
7- Commands are signed with central's certificate - devices verify before executing
9Embedded devices check for pending commands on every gossip round.
10Commands flow through the existing SyncQueue mechanism (offline-first).
12Command types:
13 config_update - Push env var / config changes to a node
14 goal_assign - Dispatch an AgentGoal to a node
15 sensor_config - Configure sensor polling intervals, pin assignments
16 firmware_update - Trigger firmware/code update on a node
17 halt - Emergency stop (respects HiveCircuitBreaker)
18 restart - Restart node services
19"""
20import json
21import logging
22import os
23import time
24from typing import Dict, List, Optional
26logger = logging.getLogger('hevolve_social')
28# Valid command types - anything else is rejected
29VALID_COMMAND_TYPES = frozenset({
30 'config_update',
31 'goal_assign',
32 'sensor_config',
33 'firmware_update',
34 'halt',
35 'restart',
36 'tts_stream',
37 'agent_consent',
38 'estop',
39 'estop_clear',
40 'tier_promote',
41 'tier_demote',
42 'device_control',
43})
46class FleetCommandService:
47 """Static service for queen bee command dispatch to fleet nodes.
49 All methods receive `db: Session` and follow the service pattern.
50 Commands are stored in FleetCommand table and drained by gossip.
51 """
53 @staticmethod
54 def push_command(
55 db, node_id: str, cmd_type: str, params: dict,
56 issued_by: str = '',
57 ) -> Optional[Dict]:
58 """Queue a signed command for a specific node.
60 Args:
61 db: SQLAlchemy session.
62 node_id: Target node's public key hex prefix.
63 cmd_type: One of VALID_COMMAND_TYPES.
64 params: Command-specific parameters.
65 issued_by: Node ID of the issuer (must be central or regional).
67 Returns:
68 Command dict on success, None on validation failure.
69 """
70 from .models import FleetCommand
72 if cmd_type not in VALID_COMMAND_TYPES:
73 logger.warning(f"Fleet: rejected invalid command type '{cmd_type}'")
74 return None
76 if not node_id:
77 logger.warning("Fleet: rejected command with empty node_id")
78 return None
80 # Sign the command with this node's key
81 signature = _sign_command(cmd_type, params, node_id)
83 issuer = issued_by or _get_self_node_id()
84 cmd = FleetCommand(
85 target_node_id=node_id,
86 cmd_type=cmd_type,
87 params_json=json.dumps(params),
88 issued_by=issuer,
89 signature=signature,
90 status='pending',
91 )
92 db.add(cmd)
93 db.flush()
95 cmd_dict = cmd.to_dict()
96 logger.info(f"Fleet: queued {cmd_type} for node {node_id[:8]}...")
98 # Push via MessageBus for instant WAMP delivery (DB is offline fallback)
99 try:
100 from core.peer_link.message_bus import get_message_bus
101 bus = get_message_bus()
102 bus.publish('fleet.command', {
103 **cmd_dict,
104 'params': params,
105 }, device_id=node_id)
106 except Exception:
107 pass # DB queue is the durable fallback
109 return cmd_dict
111 @staticmethod
112 def push_broadcast(
113 db, cmd_type: str, params: dict,
114 tier_filter: str = '', issued_by: str = '',
115 ) -> List[Dict]:
116 """Broadcast a command to all nodes (optionally filtered by tier).
118 Args:
119 db: SQLAlchemy session.
120 cmd_type: One of VALID_COMMAND_TYPES.
121 params: Command-specific parameters.
122 tier_filter: Optional tier to target (e.g. 'embedded', 'observer').
123 issued_by: Node ID of the issuer.
125 Returns:
126 List of command dicts created.
127 """
128 from .models import PeerNode
130 if cmd_type not in VALID_COMMAND_TYPES:
131 logger.warning(f"Fleet: rejected broadcast with invalid type '{cmd_type}'")
132 return []
134 # Find target nodes
135 query = db.query(PeerNode).filter(PeerNode.status == 'active')
136 if tier_filter:
137 query = query.filter(PeerNode.capability_tier == tier_filter)
139 peers = query.all()
140 results = []
141 issuer = issued_by or _get_self_node_id()
143 for peer in peers:
144 cmd = FleetCommandService.push_command(
145 db, peer.node_id, cmd_type, params, issued_by=issuer,
146 )
147 if cmd:
148 results.append(cmd)
150 logger.info(
151 f"Fleet: broadcast {cmd_type} to {len(results)} nodes"
152 f"{f' (tier={tier_filter})' if tier_filter else ''}"
153 )
154 return results
156 @staticmethod
157 def get_pending_commands(db, node_id: str) -> List[Dict]:
158 """Get pending commands for a node. Called by gossip handler.
160 Verifies each command's issuer exists in PeerNode with authority.
161 Marks verified commands as 'delivered', unverified as 'rejected'.
163 Args:
164 db: SQLAlchemy session.
165 node_id: The requesting node's ID.
167 Returns:
168 List of command dicts (only verified ones).
169 """
170 from .models import FleetCommand
172 pending = db.query(FleetCommand).filter(
173 FleetCommand.target_node_id == node_id,
174 FleetCommand.status == 'pending',
175 ).order_by(FleetCommand.created_at.asc()).all()
177 results = []
178 for cmd in pending:
179 if not _verify_issuer(db, cmd.issued_by):
180 cmd.status = 'rejected'
181 logger.warning(
182 f"Fleet: rejected cmd {cmd.id} from unverified issuer "
183 f"{cmd.issued_by[:8] if cmd.issued_by else '?'}...")
184 continue
185 cmd.status = 'delivered'
186 cmd.delivered_at = time.time()
187 results.append(cmd.to_dict())
189 if results or any(c.status == 'rejected' for c in pending):
190 db.flush()
191 if results:
192 logger.info(f"Fleet: delivered {len(results)} commands to {node_id[:8]}...")
194 return results
196 @staticmethod
197 def ack_command(db, command_id: int, node_id: str, success: bool = True,
198 result_message: str = '') -> Optional[Dict]:
199 """Acknowledge command execution by the target node.
201 Args:
202 db: SQLAlchemy session.
203 command_id: The FleetCommand ID.
204 node_id: The acknowledging node's ID.
205 success: Whether the command executed successfully.
206 result_message: Optional result or error message.
208 Returns:
209 Updated command dict, or None if not found.
210 """
211 from .models import FleetCommand
213 cmd = db.query(FleetCommand).filter(
214 FleetCommand.id == command_id,
215 FleetCommand.target_node_id == node_id,
216 ).first()
218 if not cmd:
219 return None
221 cmd.status = 'completed' if success else 'failed'
222 cmd.result_message = result_message
223 cmd.completed_at = time.time()
224 db.flush()
226 logger.info(
227 f"Fleet: command {command_id} {'completed' if success else 'failed'}"
228 f" on node {node_id[:8]}..."
229 )
230 return cmd.to_dict()
232 @staticmethod
233 def execute_command(cmd_type: str, params: dict) -> Dict:
234 """Execute a fleet command locally on this node.
236 Called by the embedded main loop when commands are received.
237 Does NOT require a db session - executes in-process.
239 Args:
240 cmd_type: The command type to execute.
241 params: Command parameters.
243 Returns:
244 {success: bool, message: str}
245 """
246 try:
247 if cmd_type == 'config_update':
248 return _execute_config_update(params)
249 elif cmd_type == 'halt':
250 return _execute_halt(params)
251 elif cmd_type == 'restart':
252 return _execute_restart(params)
253 elif cmd_type == 'sensor_config':
254 return _execute_sensor_config(params)
255 elif cmd_type == 'goal_assign':
256 return _execute_goal_assign(params)
257 elif cmd_type == 'firmware_update':
258 return _execute_firmware_update(params)
259 elif cmd_type == 'tts_stream':
260 return _execute_tts_stream(params)
261 elif cmd_type == 'agent_consent':
262 return _execute_agent_consent(params)
263 elif cmd_type == 'estop':
264 return _execute_estop(params)
265 elif cmd_type == 'estop_clear':
266 return _execute_estop_clear(params)
267 elif cmd_type == 'tier_promote':
268 return _execute_tier_promote(params)
269 elif cmd_type == 'tier_demote':
270 return _execute_tier_demote(params)
271 elif cmd_type == 'device_control':
272 return _execute_device_control(params)
273 else:
274 return {'success': False, 'message': f'Unknown command: {cmd_type}'}
275 except Exception as e:
276 logger.error(f"Fleet: command {cmd_type} failed: {e}")
277 return {'success': False, 'message': str(e)}
279 @staticmethod
280 def verify_command_signature(cmd_dict: dict) -> bool:
281 """Verify a command was signed by an authorized node (central/regional).
283 Args:
284 cmd_dict: Command dict with 'signature' and 'issued_by' fields.
286 Returns:
287 True if signature is valid and issuer is authorized.
288 """
289 signature = cmd_dict.get('signature', '')
290 issued_by = cmd_dict.get('issued_by', '')
292 if not signature or not issued_by:
293 return False
295 try:
296 from security.key_delegation import verify_tier_authorization
297 # Central and regional nodes are authorized to issue commands
298 return verify_tier_authorization(issued_by, required_tier='regional')
299 except ImportError:
300 # If key_delegation unavailable, verify via guardrail hash
301 try:
302 from security.hive_guardrails import verify_guardrail_integrity
303 return verify_guardrail_integrity()
304 except ImportError:
305 return False
308# ═══════════════════════════════════════════════════════════════
309# Issuer verification
310# ═══════════════════════════════════════════════════════════════
312def _verify_issuer(db, issued_by: str) -> bool:
313 """Check that a fleet command issuer exists in PeerNode and has authority.
315 Self-issued commands are always valid. External issuers must be
316 known, active, and have central or regional tier.
317 """
318 if not issued_by or issued_by == 'unknown':
319 return True # No issuer info = legacy/local command
320 self_id = _get_self_node_id()
321 if issued_by == self_id:
322 return True
323 try:
324 from .models import PeerNode
325 peer = db.query(PeerNode).filter_by(node_id=issued_by).first()
326 if not peer:
327 return False
328 if peer.status in ('dead', 'banned'):
329 return False
330 if peer.tier not in ('central', 'regional'):
331 return False
332 return True
333 except Exception:
334 return True # DB error = fail open for availability
337# ═══════════════════════════════════════════════════════════════
338# Command executors (local, in-process)
339# ═══════════════════════════════════════════════════════════════
341def _execute_config_update(params: dict) -> Dict:
342 """Apply config/env var updates pushed by central."""
343 updates = params.get('env_vars', {})
344 if not updates:
345 return {'success': False, 'message': 'No env_vars in params'}
347 applied = []
348 for key, value in updates.items():
349 # Security: never allow overwriting master key or guardrail vars
350 if 'MASTER' in key.upper() or 'GUARDRAIL' in key.upper():
351 logger.warning(f"Fleet: rejected config_update for protected var {key}")
352 continue
353 os.environ[key] = str(value)
354 applied.append(key)
356 return {'success': True, 'message': f'Applied {len(applied)} config updates: {applied}'}
359def _execute_halt(params: dict) -> Dict:
360 """Emergency halt - sets halt flag for the main loop.
362 Note: HiveCircuitBreaker.halt_network() requires master key signature
363 and is reserved for the steward. Fleet halt uses a process-level flag
364 that the main loop checks.
365 """
366 reason = params.get('reason', 'Central commanded halt')
367 logger.critical(f"Fleet: HALT received - {reason}")
368 os.environ['HEVOLVE_HALTED'] = 'true'
369 os.environ['HEVOLVE_HALT_REASON'] = reason
370 return {'success': True, 'message': f'Halt flag set: {reason}'}
373def _execute_restart(params: dict) -> Dict:
374 """Restart node services (not the OS)."""
375 target = params.get('target', 'all') # 'all', 'gossip', 'daemon', 'vision'
376 logger.info(f"Fleet: restart requested for {target}")
377 # Set restart flag - main loop checks this
378 os.environ['HEVOLVE_RESTART_REQUESTED'] = target
379 return {'success': True, 'message': f'Restart requested: {target}'}
382def _execute_sensor_config(params: dict) -> Dict:
383 """Configure sensor polling intervals and pin assignments."""
384 applied = []
385 if 'poll_interval_ms' in params:
386 os.environ['HEVOLVE_SENSOR_POLL_MS'] = str(params['poll_interval_ms'])
387 applied.append('poll_interval_ms')
388 if 'gpio_pins' in params:
389 os.environ['HEVOLVE_GPIO_PINS'] = json.dumps(params['gpio_pins'])
390 applied.append('gpio_pins')
391 if 'mqtt_topics' in params:
392 os.environ['HEVOLVE_MQTT_TOPICS'] = json.dumps(params['mqtt_topics'])
393 applied.append('mqtt_topics')
395 return {'success': True, 'message': f'Sensor config applied: {applied}'}
398def _execute_goal_assign(params: dict) -> Dict:
399 """Queue a goal for this node's daemon to pick up."""
400 goal_type = params.get('goal_type', '')
401 title = params.get('title', '')
402 if not goal_type or not title:
403 return {'success': False, 'message': 'goal_type and title required'}
405 # Store as pending goal for daemon
406 os.environ['HEVOLVE_PENDING_GOAL'] = json.dumps(params)
407 return {'success': True, 'message': f'Goal queued: {goal_type}/{title}'}
410def _execute_firmware_update(params: dict) -> Dict:
411 """Trigger code/firmware update from a signed release."""
412 update_url = params.get('update_url', '')
413 release_hash = params.get('release_hash', '')
414 if not update_url or not release_hash:
415 return {'success': False, 'message': 'update_url and release_hash required'}
417 # Set update flag - main loop handles the actual update
418 os.environ['HEVOLVE_PENDING_UPDATE'] = json.dumps({
419 'url': update_url,
420 'hash': release_hash,
421 'requested_at': time.time(),
422 })
423 return {'success': True, 'message': f'Firmware update queued: {release_hash[:16]}...'}
426def _execute_tts_stream(params: dict) -> Dict:
427 """Stream TTS to this device or relay to a paired device.
429 Params:
430 text: Text to speak.
431 voice: Voice ID (default 'default').
432 lang: Language code (default 'en').
433 relay_to_device_id: If set, this device should relay audio to that device.
434 """
435 text = params.get('text', '')
436 if not text:
437 return {'success': False, 'message': 'No text provided'}
439 relay_to = params.get('relay_to_device_id', '')
441 # Set env flags for the local TTS/relay loop to pick up
442 os.environ['HEVOLVE_TTS_PENDING'] = json.dumps({
443 'text': text,
444 'voice': params.get('voice', 'default'),
445 'lang': params.get('lang', 'en'),
446 'relay_to_device_id': relay_to,
447 'agent_id': params.get('agent_id', ''),
448 'requested_at': time.time(),
449 })
450 action = f"relay to {relay_to[:8]}..." if relay_to else "local playback"
451 return {'success': True, 'message': f'TTS queued: {action}'}
454def _execute_agent_consent(params: dict) -> Dict:
455 """Display consent prompt for an agent action.
457 Params:
458 action: What the agent wants to do.
459 agent_id: Which agent is requesting.
460 description: Human-readable explanation.
461 timeout_s: How long to wait for response (default 60).
462 """
463 action = params.get('action', '')
464 if not action:
465 return {'success': False, 'message': 'No action specified'}
467 os.environ['HEVOLVE_CONSENT_PENDING'] = json.dumps({
468 'action': action,
469 'agent_id': params.get('agent_id', ''),
470 'description': params.get('description', ''),
471 'timeout_s': params.get('timeout_s', 60),
472 'requested_at': time.time(),
473 })
474 return {'success': True, 'message': f'Consent requested: {action}'}
477def _execute_estop(params: dict) -> Dict:
478 """Trigger E-stop via fleet command from central.
480 Uses SafetyMonitor for proper E-stop with audit trail.
481 Falls back to simple halt flag if robotics module unavailable.
482 """
483 reason = params.get('reason', 'Central commanded E-stop')
484 try:
485 from integrations.robotics.safety_monitor import get_safety_monitor
486 monitor = get_safety_monitor()
487 monitor.trigger_estop(reason, source='fleet')
488 return {'success': True, 'message': f'E-stop triggered: {reason}'}
489 except ImportError:
490 # Fallback: use simple halt flag
491 os.environ['HEVOLVE_HALTED'] = 'true'
492 os.environ['HEVOLVE_HALT_REASON'] = f'E-STOP: {reason}'
493 return {'success': True, 'message': f'E-stop (fallback halt): {reason}'}
496def _execute_estop_clear(params: dict) -> Dict:
497 """Clear E-stop via fleet command. Requires human operator_id.
499 The operator_id in params must identify a human, not an agent.
500 """
501 operator_id = params.get('operator_id', '')
502 if not operator_id:
503 return {'success': False, 'message': 'E-stop clear requires operator_id'}
505 try:
506 from integrations.robotics.safety_monitor import get_safety_monitor
507 monitor = get_safety_monitor()
508 cleared = monitor.clear_estop(operator_id)
509 if cleared:
510 return {'success': True, 'message': f'E-stop cleared by {operator_id}'}
511 return {'success': False, 'message': 'E-stop clear rejected (agent or empty operator)'}
512 except ImportError:
513 # Fallback: clear halt flag
514 os.environ.pop('HEVOLVE_HALTED', None)
515 os.environ.pop('HEVOLVE_HALT_REASON', None)
516 return {'success': True, 'message': f'E-stop cleared (fallback): {operator_id}'}
519def _execute_tier_promote(params: dict) -> Dict:
520 """Promote this node to a higher tier (e.g., flat → regional).
522 Sets HEVOLVE_NODE_TIER env var, regenerates HART node identity,
523 and flags for auto-restart so all services reload with new tier.
524 """
525 new_tier = params.get('new_tier', '')
526 if new_tier not in ('regional', 'central'):
527 return {'success': False, 'message': f'Invalid promotion tier: {new_tier}'}
529 # Apply env vars
530 env_vars = params.get('env_vars', {})
531 for key, value in env_vars.items():
532 if 'MASTER' not in key.upper() and 'GUARDRAIL' not in key.upper():
533 os.environ[key] = str(value)
535 os.environ['HEVOLVE_NODE_TIER'] = new_tier
537 # Regenerate HART node identity for new tier
538 try:
539 from hart_onboarding import generate_node_identity
540 central_element = os.environ.get('HART_CENTRAL_ELEMENT', '')
541 generate_node_identity(
542 tier=new_tier,
543 central_element=central_element or None,
544 )
545 except Exception as e:
546 logger.warning(f"Fleet: HART identity regeneration failed: {e}")
548 # Flag for auto-restart
549 if params.get('restart_required', True):
550 os.environ['HEVOLVE_RESTART_REQUESTED'] = 'all'
551 os.environ['HEVOLVE_RESTART_REASON'] = f'Promoted to {new_tier}'
553 logger.info(f"Fleet: node promoted to {new_tier}")
554 return {
555 'success': True,
556 'message': f'Promoted to {new_tier}. Restart flagged.',
557 }
560def _execute_tier_demote(params: dict) -> Dict:
561 """Demote this node to a lower tier (e.g., regional → flat).
563 Clears tier env vars, removes HART node identity for old tier,
564 and flags for auto-restart.
565 """
566 new_tier = params.get('new_tier', 'flat')
567 reason = params.get('reason', 'Demoted by central')
569 # Apply env vars
570 env_vars = params.get('env_vars', {})
571 for key, value in env_vars.items():
572 if 'MASTER' not in key.upper() and 'GUARDRAIL' not in key.upper():
573 os.environ[key] = str(value)
575 os.environ['HEVOLVE_NODE_TIER'] = new_tier
577 # Clear regional-specific env vars
578 os.environ.pop('HART_REGIONAL_SPIRIT', None)
580 # Remove HART node identity file (will regenerate as flat on restart)
581 try:
582 from hart_onboarding import _identity_path
583 path = _identity_path()
584 if os.path.isfile(path):
585 os.remove(path)
586 logger.info("Fleet: HART node identity cleared for demotion")
587 except Exception as e:
588 logger.debug(f"Fleet: HART identity clear failed: {e}")
590 # Flag for auto-restart
591 if params.get('restart_required', True):
592 os.environ['HEVOLVE_RESTART_REQUESTED'] = 'all'
593 os.environ['HEVOLVE_RESTART_REASON'] = f'Demoted to {new_tier}: {reason}'
595 logger.info(f"Fleet: node demoted to {new_tier} ({reason})")
596 return {
597 'success': True,
598 'message': f'Demoted to {new_tier}. Restart flagged. Reason: {reason}',
599 }
602def _execute_device_control(params: dict) -> Dict:
603 """Execute a device control action locally on this node.
605 Routes the action string to the appropriate local subsystem:
606 - GPIO pin control (on/off/pwm)
607 - Serial port write
608 - Shell terminal exec (file listing, process management, etc.)
610 Params:
611 action: Natural language or structured command string.
612 category: Optional hint — 'gpio', 'serial', 'shell', or auto-detected.
613 """
614 action = params.get('action', '')
615 if not action:
616 return {'success': False, 'message': 'No action provided'}
618 category = params.get('category', '')
619 action_lower = action.lower()
621 # Auto-detect category from action text
622 if not category:
623 if any(kw in action_lower for kw in ('gpio', 'pin', 'relay', 'led', 'light')):
624 category = 'gpio'
625 elif any(kw in action_lower for kw in ('serial', 'uart', 'tty')):
626 category = 'serial'
627 else:
628 category = 'shell'
630 if category == 'gpio':
631 return _device_control_gpio(action, params)
632 elif category == 'serial':
633 return _device_control_serial(action, params)
634 else:
635 return _device_control_shell(action, params)
638def _device_control_gpio(action: str, params: dict) -> Dict:
639 """GPIO pin control via the existing GPIOAdapter."""
640 try:
641 pin = params.get('pin')
642 value = params.get('value', '')
643 if pin is None:
644 # Try to extract pin number from action text
645 import re
646 m = re.search(r'pin\s*(\d+)', action, re.IGNORECASE)
647 if m:
648 pin = int(m.group(1))
649 else:
650 return {'success': False, 'message': 'No GPIO pin specified'}
652 # Use gpiod/RPi.GPIO through existing env configuration
653 gpio_state = 'on' if any(kw in action.lower() for kw in ('on', 'high', 'enable')) else 'off'
654 if value:
655 gpio_state = value
657 os.environ['HEVOLVE_DEVICE_CONTROL_RESULT'] = json.dumps({
658 'type': 'gpio', 'pin': pin, 'state': gpio_state,
659 'requested_at': time.time(),
660 })
661 return {'success': True, 'message': f'GPIO pin {pin} set to {gpio_state}'}
662 except Exception as e:
663 return {'success': False, 'message': f'GPIO control failed: {e}'}
666def _device_control_serial(action: str, params: dict) -> Dict:
667 """Serial port write via the existing SerialAdapter pattern."""
668 port = params.get('port', os.environ.get('HEVOLVE_SERIAL_PORT', '/dev/ttyUSB0'))
669 try:
670 os.environ['HEVOLVE_DEVICE_CONTROL_RESULT'] = json.dumps({
671 'type': 'serial', 'port': port, 'action': action,
672 'requested_at': time.time(),
673 })
674 return {'success': True, 'message': f'Serial command queued on {port}: {action[:100]}'}
675 except Exception as e:
676 return {'success': False, 'message': f'Serial control failed: {e}'}
679def _device_control_shell(action: str, params: dict) -> Dict:
680 """Shell command execution for general device actions.
682 Uses a restricted command set for safety. Destructive commands are
683 classified via the action_classifier if available.
684 """
685 import shlex
686 import subprocess
688 # Extract command — support both 'run command <cmd>' and raw command
689 command = action
690 for prefix in ('run command ', 'run ', 'execute ', 'exec '):
691 if action.lower().startswith(prefix):
692 command = action[len(prefix):]
693 break
695 # Safety: classify destructive actions
696 try:
697 from security.action_classifier import classify_action
698 classification = classify_action(command)
699 if classification == 'destructive':
700 return {'success': False, 'message': f'Action classified as destructive, requires approval: {command[:100]}'}
701 except ImportError:
702 pass
704 try:
705 # G7: use shlex.split + shell=False to prevent shell injection
706 cmd_list = shlex.split(command)
707 result = subprocess.run(
708 cmd_list, shell=False, capture_output=True, text=True, timeout=30,
709 )
710 output = result.stdout[:2000] if result.stdout else ''
711 error = result.stderr[:500] if result.stderr else ''
712 return {
713 'success': result.returncode == 0,
714 'message': output or error or f'Exit code: {result.returncode}',
715 }
716 except subprocess.TimeoutExpired:
717 return {'success': False, 'message': 'Command timed out after 30s'}
718 except Exception as e:
719 return {'success': False, 'message': f'Shell execution failed: {e}'}
722# ═══════════════════════════════════════════════════════════════
723# Helpers
724# ═══════════════════════════════════════════════════════════════
726def _get_self_node_id() -> str:
727 """Get this node's ID (public key hex prefix)."""
728 try:
729 from security.node_integrity import get_public_key_hex
730 return get_public_key_hex()[:16]
731 except Exception:
732 return 'unknown'
735def _sign_command(cmd_type: str, params: dict, target_node_id: str) -> str:
736 """Sign a command with this node's private key."""
737 try:
738 from security.node_integrity import sign_message
739 message = f"{cmd_type}:{json.dumps(params, sort_keys=True)}:{target_node_id}"
740 return sign_message(message.encode())
741 except (ImportError, Exception) as e:
742 logger.debug(f"Fleet: command signing unavailable: {e}")
743 return ''