Coverage for integrations / channels / hardware / serial_adapter.py: 65.0%
123 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Serial Channel Adapter — PySerial-based UART/USB-to-serial communication.
4Bridges serial devices (Arduino, microcontrollers, sensors) to HART agents.
5Supports text_line (default), json_line, and binary_frame protocols.
7Usage:
8 from integrations.channels.hardware.serial_adapter import SerialAdapter
9 adapter = SerialAdapter(port='/dev/ttyUSB0', baud_rate=115200)
10 adapter.on_message(handler)
11 await adapter.start()
12"""
13import asyncio
14import json
15import logging
16import os
17import struct
18import threading
19import time
20import uuid
21from concurrent.futures import ThreadPoolExecutor
22from typing import Callable, Dict, List, Optional, Any
24from integrations.channels.base import (
25 ChannelAdapter, ChannelConfig, ChannelStatus,
26 Message, SendResult, MessageType,
27)
29logger = logging.getLogger(__name__)
31# Frame protocol constants
32STX = 0x02 # Start of text
33ETX = 0x03 # End of text
36class SerialAdapter(ChannelAdapter):
37 """Channel adapter for serial port communication.
39 Protocols:
40 text_line: Newline-delimited text (default, Arduino Serial.println)
41 json_line: Newline-delimited JSON objects
42 binary_frame: STX + length(2) + payload + ETX framing
43 """
45 def __init__(
46 self,
47 port: str = '',
48 baud_rate: int = 115200,
49 protocol: str = 'text_line',
50 encoding: str = 'utf-8',
51 reconnect_interval: int = 5,
52 config: ChannelConfig = None,
53 ):
54 super().__init__(config or ChannelConfig())
55 self._port = port or os.environ.get('HEVOLVE_SERIAL_PORT', '')
56 self._baud_rate = int(os.environ.get('HEVOLVE_SERIAL_BAUD', str(baud_rate)))
57 self._protocol = os.environ.get('HEVOLVE_SERIAL_PROTOCOL', protocol)
58 self._encoding = encoding
59 self._reconnect_interval = reconnect_interval
60 self._serial = None
61 self._read_thread = None
62 self._executor = ThreadPoolExecutor(max_workers=1)
64 @property
65 def name(self) -> str:
66 return 'serial'
68 async def connect(self) -> bool:
69 """Open serial port connection."""
70 try:
71 import serial
72 except ImportError:
73 logger.error("Serial adapter: pyserial not installed")
74 return False
76 if not self._port:
77 # Auto-detect first available port
78 from serial.tools import list_ports
79 ports = list(list_ports.comports())
80 if not ports:
81 logger.error("Serial adapter: no ports found")
82 return False
83 self._port = ports[0].device
84 logger.info(f"Serial adapter: auto-detected port {self._port}")
86 try:
87 self._serial = serial.Serial(
88 port=self._port,
89 baudrate=self._baud_rate,
90 timeout=1.0,
91 )
92 logger.info(f"Serial adapter: connected to {self._port} @ {self._baud_rate}")
93 self.status = ChannelStatus.CONNECTED
95 # Start read thread
96 self._read_thread = threading.Thread(
97 target=self._read_loop, daemon=True)
98 self._read_thread.start()
99 return True
100 except Exception as e:
101 logger.error(f"Serial adapter: connect failed: {e}")
102 self.status = ChannelStatus.ERROR
103 return False
105 async def disconnect(self) -> None:
106 """Close serial port."""
107 self._running = False
108 if self._serial and self._serial.is_open:
109 self._serial.close()
110 self._executor.shutdown(wait=False)
112 async def send_message(
113 self, chat_id: str, text: str,
114 reply_to: Optional[str] = None,
115 media: Optional[List] = None,
116 buttons: Optional[List[Dict]] = None,
117 ) -> SendResult:
118 """Write text to serial port."""
119 if not self._serial or not self._serial.is_open:
120 return SendResult(success=False, error="Serial port not open")
122 try:
123 if self._protocol == 'json_line':
124 data = json.dumps({'text': text, 'chat_id': chat_id}) + '\n'
125 self._serial.write(data.encode(self._encoding))
126 elif self._protocol == 'binary_frame':
127 payload = text.encode(self._encoding)
128 frame = struct.pack('>BH', STX, len(payload)) + payload + bytes([ETX])
129 self._serial.write(frame)
130 else: # text_line
131 self._serial.write((text + '\n').encode(self._encoding))
133 return SendResult(success=True, message_id=str(uuid.uuid4())[:8])
134 except Exception as e:
135 logger.error(f"Serial write failed: {e}")
136 return SendResult(success=False, error=str(e))
138 async def edit_message(self, chat_id: str, message_id: str,
139 text: str, buttons=None) -> SendResult:
140 """Serial is write-only stream — edit not supported."""
141 return await self.send_message(chat_id, text)
143 async def delete_message(self, chat_id: str, message_id: str) -> bool:
144 return False # Not applicable
146 async def send_typing(self, chat_id: str) -> None:
147 pass # Not applicable
149 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
150 return {'port': self._port, 'baud_rate': self._baud_rate}
152 def _read_loop(self):
153 """Background thread: read from serial port and dispatch messages."""
154 buffer = b''
155 while self._running and self._serial and self._serial.is_open:
156 try:
157 data = self._serial.read(self._serial.in_waiting or 1)
158 if not data:
159 continue
161 if self._protocol == 'binary_frame':
162 buffer += data
163 while len(buffer) >= 4: # STX + 2-byte len + ETX minimum
164 if buffer[0] != STX:
165 buffer = buffer[1:]
166 continue
167 payload_len = struct.unpack('>H', buffer[1:3])[0]
168 frame_len = 3 + payload_len + 1
169 if len(buffer) < frame_len:
170 break
171 if buffer[frame_len - 1] != ETX:
172 buffer = buffer[1:]
173 continue
174 payload = buffer[3:3 + payload_len]
175 buffer = buffer[frame_len:]
176 self._dispatch_serial_message(
177 payload.decode(self._encoding, errors='replace'))
178 else:
179 # Line-based protocols
180 buffer += data
181 while b'\n' in buffer:
182 line, buffer = buffer.split(b'\n', 1)
183 text = line.decode(self._encoding, errors='replace').strip()
184 if text:
185 self._dispatch_serial_message(text)
187 except Exception as e:
188 if self._running:
189 logger.warning(f"Serial read error: {e}")
190 time.sleep(self._reconnect_interval)
191 self._try_reconnect()
193 def _dispatch_serial_message(self, text: str):
194 """Create a Message and dispatch to handlers."""
195 if self._protocol == 'json_line':
196 try:
197 data = json.loads(text)
198 text = data.get('text', text)
199 except json.JSONDecodeError:
200 pass
202 msg = Message(
203 id=str(uuid.uuid4())[:8],
204 channel='serial',
205 sender_id=self._port,
206 sender_name=f'serial:{self._port}',
207 chat_id=self._port,
208 text=text,
209 )
211 # Dispatch in event loop if available, else synchronously
212 try:
213 loop = asyncio.get_event_loop()
214 if loop.is_running():
215 asyncio.run_coroutine_threadsafe(
216 self._dispatch_message(msg), loop)
217 else:
218 loop.run_until_complete(self._dispatch_message(msg))
219 except RuntimeError:
220 # No event loop — call handlers synchronously
221 for handler in self._message_handlers:
222 try:
223 handler(msg)
224 except Exception as e:
225 logger.error(f"Serial handler error: {e}")
227 def _try_reconnect(self):
228 """Attempt to reconnect to serial port after disconnect."""
229 try:
230 import serial
231 if self._serial:
232 self._serial.close()
233 self._serial = serial.Serial(
234 port=self._port, baudrate=self._baud_rate, timeout=1.0)
235 logger.info(f"Serial adapter: reconnected to {self._port}")
236 except Exception as e:
237 logger.debug(f"Serial reconnect failed: {e}")