Coverage for integrations / robotics / safety_tools.py: 82.0%
50 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Safety Tools — AutoGen tools for robot safety management.
44 tools for the unified agent goal engine:
5 - configure_workspace_limits
6 - get_safety_status
7 - test_estop
8 - configure_estop_sources
9"""
10import json
11import logging
12from typing import Dict
14logger = logging.getLogger('hevolve_robotics')
17def configure_workspace_limits(limits_json: str) -> str:
18 """Set operational domain bounds for the robot.
20 Args:
21 limits_json: JSON string with axis limits, e.g.
22 '{"x": [-1.0, 1.0], "y": [-0.5, 0.5], "z": [0.0, 1.2],
23 "joint_limits": {"joint_0": [-90, 90]}}'
25 Returns:
26 Status message.
27 """
28 try:
29 limits = json.loads(limits_json)
30 except (json.JSONDecodeError, TypeError) as e:
31 return f"Error: invalid JSON — {e}"
33 from integrations.robotics.safety_monitor import get_safety_monitor
34 monitor = get_safety_monitor()
35 monitor.register_workspace_limits(limits)
36 return f"Workspace limits configured: {list(limits.keys())}"
39def get_safety_status() -> str:
40 """Query current safety state including E-stop status and workspace limits.
42 Returns:
43 JSON string with full safety status.
44 """
45 from integrations.robotics.safety_monitor import get_safety_monitor
46 monitor = get_safety_monitor()
47 status = monitor.get_safety_status()
48 return json.dumps(status, indent=2, default=str)
51def test_estop(confirm: str = 'false') -> str:
52 """Trigger and immediately clear a test E-stop.
54 Only works when confirm='true'. Used for safety verification.
56 Args:
57 confirm: Must be 'true' to execute the test.
59 Returns:
60 Status message.
61 """
62 if confirm.lower() != 'true':
63 return "Test E-stop not executed. Pass confirm='true' to proceed."
65 from integrations.robotics.safety_monitor import get_safety_monitor
66 monitor = get_safety_monitor()
68 # Trigger
69 monitor.trigger_estop('Safety test — automatic clear follows', source='test')
71 # Immediately clear (test operator)
72 cleared = monitor.clear_estop('test_operator_safety_check')
74 if cleared:
75 return "Test E-stop: triggered and cleared successfully. Safety system operational."
76 else:
77 return "Test E-stop: triggered but clear FAILED. Manual intervention required."
80def configure_estop_sources(config_json: str) -> str:
81 """Register GPIO pins and/or serial ports as E-stop sources.
83 Args:
84 config_json: JSON string, e.g.
85 '{"gpio_pins": [17, 27], "serial": [{"port": "/dev/ttyUSB0", "pattern": "ESTOP"}]}'
87 Returns:
88 Status message.
89 """
90 try:
91 config = json.loads(config_json)
92 except (json.JSONDecodeError, TypeError) as e:
93 return f"Error: invalid JSON — {e}"
95 from integrations.robotics.safety_monitor import get_safety_monitor
96 monitor = get_safety_monitor()
98 registered = []
100 for pin in config.get('gpio_pins', []):
101 monitor.register_estop_pin(int(pin))
102 registered.append(f'GPIO pin {pin}')
104 for serial_cfg in config.get('serial', []):
105 port = serial_cfg.get('port', '')
106 pattern = serial_cfg.get('pattern', 'ESTOP')
107 if port:
108 monitor.register_estop_serial(port, pattern)
109 registered.append(f'Serial {port} (pattern={pattern})')
111 if registered:
112 monitor.start() # Start monitor if not already running
113 return f"E-stop sources registered: {', '.join(registered)}"
114 return "No E-stop sources configured."
117# Tool metadata for AutoGen registration
118SAFETY_TOOLS = [
119 {
120 'name': 'configure_workspace_limits',
121 'func': configure_workspace_limits,
122 'description': (
123 'Set operational domain bounds for the robot. '
124 'Input: JSON with axis limits {x: [min, max], y: [...], z: [...], '
125 'joint_limits: {name: [min, max]}}.'
126 ),
127 },
128 {
129 'name': 'get_safety_status',
130 'func': get_safety_status,
131 'description': (
132 'Query current robot safety state: E-stop status, workspace limits, '
133 'audit trail, monitor status.'
134 ),
135 },
136 {
137 'name': 'test_estop',
138 'func': test_estop,
139 'description': (
140 'Trigger and immediately clear a test E-stop to verify safety system. '
141 "Pass confirm='true' to execute."
142 ),
143 },
144 {
145 'name': 'configure_estop_sources',
146 'func': configure_estop_sources,
147 'description': (
148 'Register GPIO pins and/or serial ports as hardware E-stop sources. '
149 'Input: JSON with gpio_pins and/or serial arrays.'
150 ),
151 },
152]