Coverage for integrations / channels / hardware / ros_bridge.py: 51.3%
113 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2ROS 2 Bridge Channel Adapter — Subscribe/publish to ROS 2 topics.
4Bridges ROS 2 robots to HART agents. Subscribes to String and Image topics,
5publishes agent responses back to ROS 2 topics.
7Only loaded when HEVOLVE_ROS_BRIDGE_ENABLED=true (rclpy pulls ~500MB deps).
9Usage:
10 HEVOLVE_ROS_BRIDGE_ENABLED=true python embedded_main.py
12 from integrations.channels.hardware.ros_bridge import ROSBridgeAdapter
13 adapter = ROSBridgeAdapter(
14 subscribe_topics=['/hart/input', '/camera/image_raw'],
15 publish_topic='/hart/output',
16 )
17 adapter.on_message(handler)
18 await adapter.start()
19"""
20import asyncio
21import base64
22import logging
23import os
24import threading
25import time
26import uuid
27from typing import Callable, Dict, List, Optional, Any
29from integrations.channels.base import (
30 ChannelAdapter, ChannelConfig, ChannelStatus,
31 Message, SendResult,
32)
34logger = logging.getLogger(__name__)
37class ROSBridgeAdapter(ChannelAdapter):
38 """Channel adapter for ROS 2 topic pub/sub.
40 String topics → Message events (text).
41 Image topics → Message events (text = base64 frame, raw = metadata).
42 send_message → publishes String to publish_topic.
43 """
45 def __init__(
46 self,
47 subscribe_topics: List[str] = None,
48 publish_topic: str = '',
49 node_name: str = '',
50 frame_store=None,
51 config: ChannelConfig = None,
52 ):
53 super().__init__(config or ChannelConfig())
54 self._subscribe_topics = subscribe_topics or _parse_topic_list(
55 os.environ.get('HEVOLVE_ROS_TOPICS', '/hart/input'))
56 self._publish_topic = publish_topic or os.environ.get(
57 'HEVOLVE_ROS_PUBLISH_TOPIC', '/hart/output')
58 self._node_name = node_name or os.environ.get(
59 'HEVOLVE_ROS_NODE_NAME', 'hart_bridge')
60 self._frame_store = frame_store # Optional: inject frames into FrameStore
61 self._node = None
62 self._publisher = None
63 self._subscriptions = []
64 self._spin_thread = None
66 @property
67 def name(self) -> str:
68 return 'ros'
70 async def connect(self) -> bool:
71 """Initialize ROS 2 node, create subscriptions and publisher."""
72 try:
73 import rclpy
74 from rclpy.node import Node
75 except ImportError:
76 logger.error("ROS bridge: rclpy not installed")
77 return False
79 try:
80 # Initialize ROS 2 if not already
81 if not rclpy.ok():
82 rclpy.init()
84 self._node = rclpy.create_node(self._node_name)
86 # Create publisher for agent responses
87 from std_msgs.msg import String
88 self._publisher = self._node.create_publisher(
89 String, self._publish_topic, 10)
91 # Subscribe to configured topics
92 for topic in self._subscribe_topics:
93 if _is_image_topic(topic):
94 self._subscribe_image(topic)
95 else:
96 self._subscribe_string(topic)
98 # Spin in background thread
99 self._spin_thread = threading.Thread(
100 target=self._spin_loop, daemon=True)
101 self._spin_thread.start()
103 self.status = ChannelStatus.CONNECTED
104 logger.info(
105 f"ROS bridge: node='{self._node_name}', "
106 f"sub={self._subscribe_topics}, pub={self._publish_topic}"
107 )
108 return True
109 except Exception as e:
110 logger.error(f"ROS bridge: init failed: {e}")
111 self.status = ChannelStatus.ERROR
112 return False
114 async def disconnect(self) -> None:
115 """Shutdown ROS 2 node."""
116 self._running = False
117 if self._node:
118 try:
119 self._node.destroy_node()
120 except Exception:
121 pass
122 try:
123 import rclpy
124 if rclpy.ok():
125 rclpy.shutdown()
126 except Exception:
127 pass
129 async def send_message(
130 self, chat_id: str, text: str,
131 reply_to: Optional[str] = None,
132 media: Optional[List] = None,
133 buttons: Optional[List[Dict]] = None,
134 ) -> SendResult:
135 """Publish a String message to ROS 2 topic.
137 chat_id: topic to publish to (defaults to self._publish_topic)
138 text: message payload
139 """
140 if not self._node or not self._publisher:
141 return SendResult(success=False, error="ROS node not initialized")
143 try:
144 from std_msgs.msg import String
145 msg = String()
146 msg.data = text
148 # If chat_id matches publish topic, use existing publisher
149 topic = chat_id or self._publish_topic
150 if topic == self._publish_topic:
151 self._publisher.publish(msg)
152 else:
153 # Create a one-shot publisher for different topic
154 pub = self._node.create_publisher(String, topic, 10)
155 pub.publish(msg)
156 # Cleanup after brief delay
157 self._node.create_timer(1.0, lambda: self._node.destroy_publisher(pub))
159 return SendResult(success=True, message_id=f"ros_{uuid.uuid4().hex[:8]}")
160 except Exception as e:
161 logger.error(f"ROS publish error: {e}")
162 return SendResult(success=False, error=str(e))
164 async def edit_message(self, chat_id: str, message_id: str,
165 text: str, buttons=None) -> SendResult:
166 """ROS topics are streams — edit means re-publish."""
167 return await self.send_message(chat_id, text)
169 async def delete_message(self, chat_id: str, message_id: str) -> bool:
170 return False # Not applicable
172 async def send_typing(self, chat_id: str) -> None:
173 pass # Not applicable
175 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
176 return {
177 'node_name': self._node_name,
178 'subscribe_topics': self._subscribe_topics,
179 'publish_topic': self._publish_topic,
180 }
182 # ─── Internal ───
184 def _subscribe_string(self, topic: str):
185 """Subscribe to a String topic."""
186 try:
187 from std_msgs.msg import String
189 def callback(msg):
190 self._dispatch_ros_message(topic, msg.data)
192 sub = self._node.create_subscription(String, topic, callback, 10)
193 self._subscriptions.append(sub)
194 logger.debug(f"ROS bridge: subscribed to String topic {topic}")
195 except Exception as e:
196 logger.error(f"ROS bridge: failed to subscribe to {topic}: {e}")
198 def _subscribe_image(self, topic: str):
199 """Subscribe to an Image topic (sensor_msgs/Image)."""
200 try:
201 from sensor_msgs.msg import Image
203 def callback(msg):
204 self._handle_image_message(topic, msg)
206 sub = self._node.create_subscription(Image, topic, callback, 10)
207 self._subscriptions.append(sub)
208 logger.debug(f"ROS bridge: subscribed to Image topic {topic}")
209 except ImportError:
210 logger.warning(
211 f"ROS bridge: sensor_msgs not available, "
212 f"skipping image topic {topic}")
213 except Exception as e:
214 logger.error(f"ROS bridge: failed to subscribe to {topic}: {e}")
216 def _handle_image_message(self, topic: str, img_msg):
217 """Process ROS Image message — inject into FrameStore or dispatch."""
218 frame_data = bytes(img_msg.data)
220 # Inject into FrameStore if available
221 if self._frame_store:
222 try:
223 self._frame_store.put_frame(
224 user_id=f'ros:{topic}',
225 frame_data=frame_data,
226 )
227 except Exception as e:
228 logger.debug(f"ROS bridge: FrameStore inject failed: {e}")
230 # Also dispatch as Message with metadata
231 self._dispatch_ros_message(
232 topic,
233 text=f'image:{img_msg.width}x{img_msg.height}:{img_msg.encoding}',
234 raw={
235 'width': img_msg.width,
236 'height': img_msg.height,
237 'encoding': img_msg.encoding,
238 'step': img_msg.step,
239 'frame_size': len(frame_data),
240 },
241 )
243 def _dispatch_ros_message(self, topic: str, text: str,
244 raw: Optional[Dict] = None):
245 """Create Message from ROS data and dispatch to handlers."""
246 msg = Message(
247 id=str(uuid.uuid4())[:8],
248 channel='ros',
249 sender_id=f'ros:{topic}',
250 sender_name=f'ROS {topic}',
251 chat_id=topic,
252 text=text,
253 raw=raw or {'topic': topic},
254 )
256 for handler in self._message_handlers:
257 try:
258 handler(msg)
259 except Exception as e:
260 logger.error(f"ROS handler error: {e}")
262 def _spin_loop(self):
263 """Background thread: spin ROS 2 node to process callbacks."""
264 try:
265 import rclpy
266 while self._running and rclpy.ok() and self._node:
267 rclpy.spin_once(self._node, timeout_sec=0.1)
268 except Exception as e:
269 if self._running:
270 logger.error(f"ROS spin loop error: {e}")
273def _parse_topic_list(env_value: str) -> List[str]:
274 """Parse comma-separated ROS topic names from env var."""
275 if not env_value:
276 return ['/hart/input']
277 return [t.strip() for t in env_value.split(',') if t.strip()]
280def _is_image_topic(topic: str) -> bool:
281 """Heuristic: detect if a topic carries Image messages."""
282 image_keywords = ['image', 'camera', 'rgb', 'depth', 'frame']
283 topic_lower = topic.lower()
284 return any(kw in topic_lower for kw in image_keywords)