Coverage for integrations / channels / hardware / ros_bridge.py: 51.3%

113 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2ROS 2 Bridge Channel Adapter — Subscribe/publish to ROS 2 topics. 

3 

4Bridges ROS 2 robots to HART agents. Subscribes to String and Image topics, 

5publishes agent responses back to ROS 2 topics. 

6 

7Only loaded when HEVOLVE_ROS_BRIDGE_ENABLED=true (rclpy pulls ~500MB deps). 

8 

9Usage: 

10 HEVOLVE_ROS_BRIDGE_ENABLED=true python embedded_main.py 

11 

12 from integrations.channels.hardware.ros_bridge import ROSBridgeAdapter 

13 adapter = ROSBridgeAdapter( 

14 subscribe_topics=['/hart/input', '/camera/image_raw'], 

15 publish_topic='/hart/output', 

16 ) 

17 adapter.on_message(handler) 

18 await adapter.start() 

19""" 

20import asyncio 

21import base64 

22import logging 

23import os 

24import threading 

25import time 

26import uuid 

27from typing import Callable, Dict, List, Optional, Any 

28 

29from integrations.channels.base import ( 

30 ChannelAdapter, ChannelConfig, ChannelStatus, 

31 Message, SendResult, 

32) 

33 

34logger = logging.getLogger(__name__) 

35 

36 

37class ROSBridgeAdapter(ChannelAdapter): 

38 """Channel adapter for ROS 2 topic pub/sub. 

39 

40 String topics → Message events (text). 

41 Image topics → Message events (text = base64 frame, raw = metadata). 

42 send_message → publishes String to publish_topic. 

43 """ 

44 

45 def __init__( 

46 self, 

47 subscribe_topics: List[str] = None, 

48 publish_topic: str = '', 

49 node_name: str = '', 

50 frame_store=None, 

51 config: ChannelConfig = None, 

52 ): 

53 super().__init__(config or ChannelConfig()) 

54 self._subscribe_topics = subscribe_topics or _parse_topic_list( 

55 os.environ.get('HEVOLVE_ROS_TOPICS', '/hart/input')) 

56 self._publish_topic = publish_topic or os.environ.get( 

57 'HEVOLVE_ROS_PUBLISH_TOPIC', '/hart/output') 

58 self._node_name = node_name or os.environ.get( 

59 'HEVOLVE_ROS_NODE_NAME', 'hart_bridge') 

60 self._frame_store = frame_store # Optional: inject frames into FrameStore 

61 self._node = None 

62 self._publisher = None 

63 self._subscriptions = [] 

64 self._spin_thread = None 

65 

66 @property 

67 def name(self) -> str: 

68 return 'ros' 

69 

70 async def connect(self) -> bool: 

71 """Initialize ROS 2 node, create subscriptions and publisher.""" 

72 try: 

73 import rclpy 

74 from rclpy.node import Node 

75 except ImportError: 

76 logger.error("ROS bridge: rclpy not installed") 

77 return False 

78 

79 try: 

80 # Initialize ROS 2 if not already 

81 if not rclpy.ok(): 

82 rclpy.init() 

83 

84 self._node = rclpy.create_node(self._node_name) 

85 

86 # Create publisher for agent responses 

87 from std_msgs.msg import String 

88 self._publisher = self._node.create_publisher( 

89 String, self._publish_topic, 10) 

90 

91 # Subscribe to configured topics 

92 for topic in self._subscribe_topics: 

93 if _is_image_topic(topic): 

94 self._subscribe_image(topic) 

95 else: 

96 self._subscribe_string(topic) 

97 

98 # Spin in background thread 

99 self._spin_thread = threading.Thread( 

100 target=self._spin_loop, daemon=True) 

101 self._spin_thread.start() 

102 

103 self.status = ChannelStatus.CONNECTED 

104 logger.info( 

105 f"ROS bridge: node='{self._node_name}', " 

106 f"sub={self._subscribe_topics}, pub={self._publish_topic}" 

107 ) 

108 return True 

109 except Exception as e: 

110 logger.error(f"ROS bridge: init failed: {e}") 

111 self.status = ChannelStatus.ERROR 

112 return False 

113 

114 async def disconnect(self) -> None: 

115 """Shutdown ROS 2 node.""" 

116 self._running = False 

117 if self._node: 

118 try: 

119 self._node.destroy_node() 

120 except Exception: 

121 pass 

122 try: 

123 import rclpy 

124 if rclpy.ok(): 

125 rclpy.shutdown() 

126 except Exception: 

127 pass 

128 

129 async def send_message( 

130 self, chat_id: str, text: str, 

131 reply_to: Optional[str] = None, 

132 media: Optional[List] = None, 

133 buttons: Optional[List[Dict]] = None, 

134 ) -> SendResult: 

135 """Publish a String message to ROS 2 topic. 

136 

137 chat_id: topic to publish to (defaults to self._publish_topic) 

138 text: message payload 

139 """ 

140 if not self._node or not self._publisher: 

141 return SendResult(success=False, error="ROS node not initialized") 

142 

143 try: 

144 from std_msgs.msg import String 

145 msg = String() 

146 msg.data = text 

147 

148 # If chat_id matches publish topic, use existing publisher 

149 topic = chat_id or self._publish_topic 

150 if topic == self._publish_topic: 

151 self._publisher.publish(msg) 

152 else: 

153 # Create a one-shot publisher for different topic 

154 pub = self._node.create_publisher(String, topic, 10) 

155 pub.publish(msg) 

156 # Cleanup after brief delay 

157 self._node.create_timer(1.0, lambda: self._node.destroy_publisher(pub)) 

158 

159 return SendResult(success=True, message_id=f"ros_{uuid.uuid4().hex[:8]}") 

160 except Exception as e: 

161 logger.error(f"ROS publish error: {e}") 

162 return SendResult(success=False, error=str(e)) 

163 

164 async def edit_message(self, chat_id: str, message_id: str, 

165 text: str, buttons=None) -> SendResult: 

166 """ROS topics are streams — edit means re-publish.""" 

167 return await self.send_message(chat_id, text) 

168 

169 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

170 return False # Not applicable 

171 

172 async def send_typing(self, chat_id: str) -> None: 

173 pass # Not applicable 

174 

175 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

176 return { 

177 'node_name': self._node_name, 

178 'subscribe_topics': self._subscribe_topics, 

179 'publish_topic': self._publish_topic, 

180 } 

181 

182 # ─── Internal ─── 

183 

184 def _subscribe_string(self, topic: str): 

185 """Subscribe to a String topic.""" 

186 try: 

187 from std_msgs.msg import String 

188 

189 def callback(msg): 

190 self._dispatch_ros_message(topic, msg.data) 

191 

192 sub = self._node.create_subscription(String, topic, callback, 10) 

193 self._subscriptions.append(sub) 

194 logger.debug(f"ROS bridge: subscribed to String topic {topic}") 

195 except Exception as e: 

196 logger.error(f"ROS bridge: failed to subscribe to {topic}: {e}") 

197 

198 def _subscribe_image(self, topic: str): 

199 """Subscribe to an Image topic (sensor_msgs/Image).""" 

200 try: 

201 from sensor_msgs.msg import Image 

202 

203 def callback(msg): 

204 self._handle_image_message(topic, msg) 

205 

206 sub = self._node.create_subscription(Image, topic, callback, 10) 

207 self._subscriptions.append(sub) 

208 logger.debug(f"ROS bridge: subscribed to Image topic {topic}") 

209 except ImportError: 

210 logger.warning( 

211 f"ROS bridge: sensor_msgs not available, " 

212 f"skipping image topic {topic}") 

213 except Exception as e: 

214 logger.error(f"ROS bridge: failed to subscribe to {topic}: {e}") 

215 

216 def _handle_image_message(self, topic: str, img_msg): 

217 """Process ROS Image message — inject into FrameStore or dispatch.""" 

218 frame_data = bytes(img_msg.data) 

219 

220 # Inject into FrameStore if available 

221 if self._frame_store: 

222 try: 

223 self._frame_store.put_frame( 

224 user_id=f'ros:{topic}', 

225 frame_data=frame_data, 

226 ) 

227 except Exception as e: 

228 logger.debug(f"ROS bridge: FrameStore inject failed: {e}") 

229 

230 # Also dispatch as Message with metadata 

231 self._dispatch_ros_message( 

232 topic, 

233 text=f'image:{img_msg.width}x{img_msg.height}:{img_msg.encoding}', 

234 raw={ 

235 'width': img_msg.width, 

236 'height': img_msg.height, 

237 'encoding': img_msg.encoding, 

238 'step': img_msg.step, 

239 'frame_size': len(frame_data), 

240 }, 

241 ) 

242 

243 def _dispatch_ros_message(self, topic: str, text: str, 

244 raw: Optional[Dict] = None): 

245 """Create Message from ROS data and dispatch to handlers.""" 

246 msg = Message( 

247 id=str(uuid.uuid4())[:8], 

248 channel='ros', 

249 sender_id=f'ros:{topic}', 

250 sender_name=f'ROS {topic}', 

251 chat_id=topic, 

252 text=text, 

253 raw=raw or {'topic': topic}, 

254 ) 

255 

256 for handler in self._message_handlers: 

257 try: 

258 handler(msg) 

259 except Exception as e: 

260 logger.error(f"ROS handler error: {e}") 

261 

262 def _spin_loop(self): 

263 """Background thread: spin ROS 2 node to process callbacks.""" 

264 try: 

265 import rclpy 

266 while self._running and rclpy.ok() and self._node: 

267 rclpy.spin_once(self._node, timeout_sec=0.1) 

268 except Exception as e: 

269 if self._running: 

270 logger.error(f"ROS spin loop error: {e}") 

271 

272 

273def _parse_topic_list(env_value: str) -> List[str]: 

274 """Parse comma-separated ROS topic names from env var.""" 

275 if not env_value: 

276 return ['/hart/input'] 

277 return [t.strip() for t in env_value.split(',') if t.strip()] 

278 

279 

280def _is_image_topic(topic: str) -> bool: 

281 """Heuristic: detect if a topic carries Image messages.""" 

282 image_keywords = ['image', 'camera', 'rgb', 'depth', 'frame'] 

283 topic_lower = topic.lower() 

284 return any(kw in topic_lower for kw in image_keywords)