Coverage for integrations / robotics / safety_tools.py: 82.0%

50 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Safety Tools — AutoGen tools for robot safety management. 

3 

44 tools for the unified agent goal engine: 

5 - configure_workspace_limits 

6 - get_safety_status 

7 - test_estop 

8 - configure_estop_sources 

9""" 

10import json 

11import logging 

12from typing import Dict 

13 

14logger = logging.getLogger('hevolve_robotics') 

15 

16 

17def configure_workspace_limits(limits_json: str) -> str: 

18 """Set operational domain bounds for the robot. 

19 

20 Args: 

21 limits_json: JSON string with axis limits, e.g. 

22 '{"x": [-1.0, 1.0], "y": [-0.5, 0.5], "z": [0.0, 1.2], 

23 "joint_limits": {"joint_0": [-90, 90]}}' 

24 

25 Returns: 

26 Status message. 

27 """ 

28 try: 

29 limits = json.loads(limits_json) 

30 except (json.JSONDecodeError, TypeError) as e: 

31 return f"Error: invalid JSON — {e}" 

32 

33 from integrations.robotics.safety_monitor import get_safety_monitor 

34 monitor = get_safety_monitor() 

35 monitor.register_workspace_limits(limits) 

36 return f"Workspace limits configured: {list(limits.keys())}" 

37 

38 

39def get_safety_status() -> str: 

40 """Query current safety state including E-stop status and workspace limits. 

41 

42 Returns: 

43 JSON string with full safety status. 

44 """ 

45 from integrations.robotics.safety_monitor import get_safety_monitor 

46 monitor = get_safety_monitor() 

47 status = monitor.get_safety_status() 

48 return json.dumps(status, indent=2, default=str) 

49 

50 

51def test_estop(confirm: str = 'false') -> str: 

52 """Trigger and immediately clear a test E-stop. 

53 

54 Only works when confirm='true'. Used for safety verification. 

55 

56 Args: 

57 confirm: Must be 'true' to execute the test. 

58 

59 Returns: 

60 Status message. 

61 """ 

62 if confirm.lower() != 'true': 

63 return "Test E-stop not executed. Pass confirm='true' to proceed." 

64 

65 from integrations.robotics.safety_monitor import get_safety_monitor 

66 monitor = get_safety_monitor() 

67 

68 # Trigger 

69 monitor.trigger_estop('Safety test — automatic clear follows', source='test') 

70 

71 # Immediately clear (test operator) 

72 cleared = monitor.clear_estop('test_operator_safety_check') 

73 

74 if cleared: 

75 return "Test E-stop: triggered and cleared successfully. Safety system operational." 

76 else: 

77 return "Test E-stop: triggered but clear FAILED. Manual intervention required." 

78 

79 

80def configure_estop_sources(config_json: str) -> str: 

81 """Register GPIO pins and/or serial ports as E-stop sources. 

82 

83 Args: 

84 config_json: JSON string, e.g. 

85 '{"gpio_pins": [17, 27], "serial": [{"port": "/dev/ttyUSB0", "pattern": "ESTOP"}]}' 

86 

87 Returns: 

88 Status message. 

89 """ 

90 try: 

91 config = json.loads(config_json) 

92 except (json.JSONDecodeError, TypeError) as e: 

93 return f"Error: invalid JSON — {e}" 

94 

95 from integrations.robotics.safety_monitor import get_safety_monitor 

96 monitor = get_safety_monitor() 

97 

98 registered = [] 

99 

100 for pin in config.get('gpio_pins', []): 

101 monitor.register_estop_pin(int(pin)) 

102 registered.append(f'GPIO pin {pin}') 

103 

104 for serial_cfg in config.get('serial', []): 

105 port = serial_cfg.get('port', '') 

106 pattern = serial_cfg.get('pattern', 'ESTOP') 

107 if port: 

108 monitor.register_estop_serial(port, pattern) 

109 registered.append(f'Serial {port} (pattern={pattern})') 

110 

111 if registered: 

112 monitor.start() # Start monitor if not already running 

113 return f"E-stop sources registered: {', '.join(registered)}" 

114 return "No E-stop sources configured." 

115 

116 

117# Tool metadata for AutoGen registration 

118SAFETY_TOOLS = [ 

119 { 

120 'name': 'configure_workspace_limits', 

121 'func': configure_workspace_limits, 

122 'description': ( 

123 'Set operational domain bounds for the robot. ' 

124 'Input: JSON with axis limits {x: [min, max], y: [...], z: [...], ' 

125 'joint_limits: {name: [min, max]}}.' 

126 ), 

127 }, 

128 { 

129 'name': 'get_safety_status', 

130 'func': get_safety_status, 

131 'description': ( 

132 'Query current robot safety state: E-stop status, workspace limits, ' 

133 'audit trail, monitor status.' 

134 ), 

135 }, 

136 { 

137 'name': 'test_estop', 

138 'func': test_estop, 

139 'description': ( 

140 'Trigger and immediately clear a test E-stop to verify safety system. ' 

141 "Pass confirm='true' to execute." 

142 ), 

143 }, 

144 { 

145 'name': 'configure_estop_sources', 

146 'func': configure_estop_sources, 

147 'description': ( 

148 'Register GPIO pins and/or serial ports as hardware E-stop sources. ' 

149 'Input: JSON with gpio_pins and/or serial arrays.' 

150 ), 

151 }, 

152]