Coverage for integrations / channels / hardware / serial_adapter.py: 65.0%

123 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Serial Channel Adapter — PySerial-based UART/USB-to-serial communication. 

3 

4Bridges serial devices (Arduino, microcontrollers, sensors) to HART agents. 

5Supports text_line (default), json_line, and binary_frame protocols. 

6 

7Usage: 

8 from integrations.channels.hardware.serial_adapter import SerialAdapter 

9 adapter = SerialAdapter(port='/dev/ttyUSB0', baud_rate=115200) 

10 adapter.on_message(handler) 

11 await adapter.start() 

12""" 

13import asyncio 

14import json 

15import logging 

16import os 

17import struct 

18import threading 

19import time 

20import uuid 

21from concurrent.futures import ThreadPoolExecutor 

22from typing import Callable, Dict, List, Optional, Any 

23 

24from integrations.channels.base import ( 

25 ChannelAdapter, ChannelConfig, ChannelStatus, 

26 Message, SendResult, MessageType, 

27) 

28 

29logger = logging.getLogger(__name__) 

30 

31# Frame protocol constants 

32STX = 0x02 # Start of text 

33ETX = 0x03 # End of text 

34 

35 

36class SerialAdapter(ChannelAdapter): 

37 """Channel adapter for serial port communication. 

38 

39 Protocols: 

40 text_line: Newline-delimited text (default, Arduino Serial.println) 

41 json_line: Newline-delimited JSON objects 

42 binary_frame: STX + length(2) + payload + ETX framing 

43 """ 

44 

45 def __init__( 

46 self, 

47 port: str = '', 

48 baud_rate: int = 115200, 

49 protocol: str = 'text_line', 

50 encoding: str = 'utf-8', 

51 reconnect_interval: int = 5, 

52 config: ChannelConfig = None, 

53 ): 

54 super().__init__(config or ChannelConfig()) 

55 self._port = port or os.environ.get('HEVOLVE_SERIAL_PORT', '') 

56 self._baud_rate = int(os.environ.get('HEVOLVE_SERIAL_BAUD', str(baud_rate))) 

57 self._protocol = os.environ.get('HEVOLVE_SERIAL_PROTOCOL', protocol) 

58 self._encoding = encoding 

59 self._reconnect_interval = reconnect_interval 

60 self._serial = None 

61 self._read_thread = None 

62 self._executor = ThreadPoolExecutor(max_workers=1) 

63 

64 @property 

65 def name(self) -> str: 

66 return 'serial' 

67 

68 async def connect(self) -> bool: 

69 """Open serial port connection.""" 

70 try: 

71 import serial 

72 except ImportError: 

73 logger.error("Serial adapter: pyserial not installed") 

74 return False 

75 

76 if not self._port: 

77 # Auto-detect first available port 

78 from serial.tools import list_ports 

79 ports = list(list_ports.comports()) 

80 if not ports: 

81 logger.error("Serial adapter: no ports found") 

82 return False 

83 self._port = ports[0].device 

84 logger.info(f"Serial adapter: auto-detected port {self._port}") 

85 

86 try: 

87 self._serial = serial.Serial( 

88 port=self._port, 

89 baudrate=self._baud_rate, 

90 timeout=1.0, 

91 ) 

92 logger.info(f"Serial adapter: connected to {self._port} @ {self._baud_rate}") 

93 self.status = ChannelStatus.CONNECTED 

94 

95 # Start read thread 

96 self._read_thread = threading.Thread( 

97 target=self._read_loop, daemon=True) 

98 self._read_thread.start() 

99 return True 

100 except Exception as e: 

101 logger.error(f"Serial adapter: connect failed: {e}") 

102 self.status = ChannelStatus.ERROR 

103 return False 

104 

105 async def disconnect(self) -> None: 

106 """Close serial port.""" 

107 self._running = False 

108 if self._serial and self._serial.is_open: 

109 self._serial.close() 

110 self._executor.shutdown(wait=False) 

111 

112 async def send_message( 

113 self, chat_id: str, text: str, 

114 reply_to: Optional[str] = None, 

115 media: Optional[List] = None, 

116 buttons: Optional[List[Dict]] = None, 

117 ) -> SendResult: 

118 """Write text to serial port.""" 

119 if not self._serial or not self._serial.is_open: 

120 return SendResult(success=False, error="Serial port not open") 

121 

122 try: 

123 if self._protocol == 'json_line': 

124 data = json.dumps({'text': text, 'chat_id': chat_id}) + '\n' 

125 self._serial.write(data.encode(self._encoding)) 

126 elif self._protocol == 'binary_frame': 

127 payload = text.encode(self._encoding) 

128 frame = struct.pack('>BH', STX, len(payload)) + payload + bytes([ETX]) 

129 self._serial.write(frame) 

130 else: # text_line 

131 self._serial.write((text + '\n').encode(self._encoding)) 

132 

133 return SendResult(success=True, message_id=str(uuid.uuid4())[:8]) 

134 except Exception as e: 

135 logger.error(f"Serial write failed: {e}") 

136 return SendResult(success=False, error=str(e)) 

137 

138 async def edit_message(self, chat_id: str, message_id: str, 

139 text: str, buttons=None) -> SendResult: 

140 """Serial is write-only stream — edit not supported.""" 

141 return await self.send_message(chat_id, text) 

142 

143 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

144 return False # Not applicable 

145 

146 async def send_typing(self, chat_id: str) -> None: 

147 pass # Not applicable 

148 

149 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

150 return {'port': self._port, 'baud_rate': self._baud_rate} 

151 

152 def _read_loop(self): 

153 """Background thread: read from serial port and dispatch messages.""" 

154 buffer = b'' 

155 while self._running and self._serial and self._serial.is_open: 

156 try: 

157 data = self._serial.read(self._serial.in_waiting or 1) 

158 if not data: 

159 continue 

160 

161 if self._protocol == 'binary_frame': 

162 buffer += data 

163 while len(buffer) >= 4: # STX + 2-byte len + ETX minimum 

164 if buffer[0] != STX: 

165 buffer = buffer[1:] 

166 continue 

167 payload_len = struct.unpack('>H', buffer[1:3])[0] 

168 frame_len = 3 + payload_len + 1 

169 if len(buffer) < frame_len: 

170 break 

171 if buffer[frame_len - 1] != ETX: 

172 buffer = buffer[1:] 

173 continue 

174 payload = buffer[3:3 + payload_len] 

175 buffer = buffer[frame_len:] 

176 self._dispatch_serial_message( 

177 payload.decode(self._encoding, errors='replace')) 

178 else: 

179 # Line-based protocols 

180 buffer += data 

181 while b'\n' in buffer: 

182 line, buffer = buffer.split(b'\n', 1) 

183 text = line.decode(self._encoding, errors='replace').strip() 

184 if text: 

185 self._dispatch_serial_message(text) 

186 

187 except Exception as e: 

188 if self._running: 

189 logger.warning(f"Serial read error: {e}") 

190 time.sleep(self._reconnect_interval) 

191 self._try_reconnect() 

192 

193 def _dispatch_serial_message(self, text: str): 

194 """Create a Message and dispatch to handlers.""" 

195 if self._protocol == 'json_line': 

196 try: 

197 data = json.loads(text) 

198 text = data.get('text', text) 

199 except json.JSONDecodeError: 

200 pass 

201 

202 msg = Message( 

203 id=str(uuid.uuid4())[:8], 

204 channel='serial', 

205 sender_id=self._port, 

206 sender_name=f'serial:{self._port}', 

207 chat_id=self._port, 

208 text=text, 

209 ) 

210 

211 # Dispatch in event loop if available, else synchronously 

212 try: 

213 loop = asyncio.get_event_loop() 

214 if loop.is_running(): 

215 asyncio.run_coroutine_threadsafe( 

216 self._dispatch_message(msg), loop) 

217 else: 

218 loop.run_until_complete(self._dispatch_message(msg)) 

219 except RuntimeError: 

220 # No event loop — call handlers synchronously 

221 for handler in self._message_handlers: 

222 try: 

223 handler(msg) 

224 except Exception as e: 

225 logger.error(f"Serial handler error: {e}") 

226 

227 def _try_reconnect(self): 

228 """Attempt to reconnect to serial port after disconnect.""" 

229 try: 

230 import serial 

231 if self._serial: 

232 self._serial.close() 

233 self._serial = serial.Serial( 

234 port=self._port, baudrate=self._baud_rate, timeout=1.0) 

235 logger.info(f"Serial adapter: reconnected to {self._port}") 

236 except Exception as e: 

237 logger.debug(f"Serial reconnect failed: {e}")