Coverage for integrations / channels / registry.py: 54.5%

99 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Channel Registry 

3 

4Manages all channel adapters and provides unified access. 

5Handles routing messages to/from the agent system. 

6""" 

7 

8import asyncio 

9import logging 

10from typing import Dict, Optional, Callable, Any, List 

11from dataclasses import dataclass, field 

12from core.port_registry import get_port 

13 

14from .base import ( 

15 ChannelAdapter, 

16 ChannelConfig, 

17 ChannelStatus, 

18 Message, 

19 SendResult, 

20 MediaAttachment, 

21) 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26@dataclass 

27class ChannelRegistryConfig: 

28 """Configuration for channel registry.""" 

29 from core.constants import DEFAULT_USER_ID, DEFAULT_PROMPT_ID 

30 agent_callback_url: str = None 

31 default_user_id: int = DEFAULT_USER_ID 

32 default_prompt_id: int = DEFAULT_PROMPT_ID 

33 enable_create_mode: bool = False 

34 

35 def __post_init__(self): 

36 if self.agent_callback_url is None: 

37 self.agent_callback_url = f"http://localhost:{get_port('backend')}/chat" 

38 

39 

40class ChannelRegistry: 

41 """ 

42 Central registry for all messaging channel adapters. 

43 

44 Provides: 

45 - Unified message routing 

46 - Channel lifecycle management 

47 - Agent integration 

48 """ 

49 

50 def __init__(self, config: ChannelRegistryConfig = None): 

51 self.config = config or ChannelRegistryConfig() 

52 self._adapters: Dict[str, ChannelAdapter] = {} 

53 self._agent_handler: Optional[Callable] = None 

54 self._running = False 

55 

56 def register(self, adapter: ChannelAdapter) -> None: 

57 """ 

58 Register a channel adapter. 

59 

60 Args: 

61 adapter: Channel adapter instance 

62 """ 

63 if adapter.name in self._adapters: 

64 logger.warning(f"Replacing existing adapter for {adapter.name}") 

65 

66 self._adapters[adapter.name] = adapter 

67 

68 # Set up message routing 

69 adapter.on_message(self._route_to_agent) 

70 

71 logger.info(f"Registered channel adapter: {adapter.name}") 

72 

73 def unregister(self, channel_name: str) -> None: 

74 """Unregister a channel adapter.""" 

75 if channel_name in self._adapters: 

76 del self._adapters[channel_name] 

77 logger.info(f"Unregistered channel adapter: {channel_name}") 

78 

79 def get(self, channel_name: str) -> Optional[ChannelAdapter]: 

80 """Get adapter by name.""" 

81 return self._adapters.get(channel_name) 

82 

83 def list_channels(self) -> List[str]: 

84 """List all registered channel names.""" 

85 return list(self._adapters.keys()) 

86 

87 def get_status(self) -> Dict[str, ChannelStatus]: 

88 """Get status of all channels.""" 

89 return {name: adapter.get_status() for name, adapter in self._adapters.items()} 

90 

91 def set_agent_handler(self, handler: Callable[[Message], str]) -> None: 

92 """ 

93 Set the agent handler function. 

94 

95 This function receives messages and returns agent responses. 

96 

97 Args: 

98 handler: Async function (Message) -> str (response) 

99 """ 

100 self._agent_handler = handler 

101 

102 async def _route_to_agent(self, message: Message) -> None: 

103 """ 

104 Route incoming message to agent and send response. 

105 

106 This is the core integration point between channels and the agent system. 

107 """ 

108 if not self._agent_handler: 

109 logger.warning("No agent handler set, ignoring message") 

110 return 

111 

112 adapter = self._adapters.get(message.channel) 

113 if not adapter: 

114 logger.error(f"No adapter found for channel: {message.channel}") 

115 return 

116 

117 try: 

118 # Send typing indicator 

119 await adapter.send_typing(message.chat_id) 

120 

121 # Get response from agent 

122 response = self._agent_handler(message) 

123 if asyncio.iscoroutine(response): 

124 response = await response 

125 

126 if response: 

127 # Send response back to channel 

128 await adapter.send_message( 

129 chat_id=message.chat_id, 

130 text=response, 

131 reply_to=message.id, 

132 ) 

133 

134 except Exception as e: 

135 logger.error(f"Error routing message to agent: {e}") 

136 # Optionally send error message to user 

137 try: 

138 await adapter.send_message( 

139 chat_id=message.chat_id, 

140 text="Sorry, I encountered an error processing your message.", 

141 reply_to=message.id, 

142 ) 

143 except Exception: 

144 pass 

145 

146 async def send_to_channel( 

147 self, 

148 channel: str, 

149 chat_id: str, 

150 text: str, 

151 **kwargs 

152 ) -> SendResult: 

153 """ 

154 Send a message to a specific channel. 

155 

156 Args: 

157 channel: Channel name (telegram, discord, etc.) 

158 chat_id: Target chat ID 

159 text: Message text 

160 **kwargs: Additional arguments (media, buttons, etc.) 

161 """ 

162 adapter = self._adapters.get(channel) 

163 if not adapter: 

164 return SendResult(success=False, error=f"Unknown channel: {channel}") 

165 

166 if not adapter.is_running(): 

167 return SendResult(success=False, error=f"Channel {channel} not connected") 

168 

169 return await adapter.send_message(chat_id, text, **kwargs) 

170 

171 async def broadcast( 

172 self, 

173 text: str, 

174 channels: Optional[List[str]] = None, 

175 chat_ids: Optional[Dict[str, str]] = None, 

176 ) -> Dict[str, SendResult]: 

177 """ 

178 Broadcast message to multiple channels. 

179 

180 Args: 

181 text: Message text 

182 channels: List of channels to broadcast to (all if None) 

183 chat_ids: Dict of channel -> chat_id mappings 

184 

185 Returns: 

186 Dict of channel -> SendResult 

187 """ 

188 results = {} 

189 target_channels = channels or list(self._adapters.keys()) 

190 

191 for channel in target_channels: 

192 if channel not in self._adapters: 

193 results[channel] = SendResult(success=False, error="Unknown channel") 

194 continue 

195 

196 chat_id = chat_ids.get(channel) if chat_ids else None 

197 if not chat_id: 

198 results[channel] = SendResult(success=False, error="No chat_id for channel") 

199 continue 

200 

201 results[channel] = await self.send_to_channel(channel, chat_id, text) 

202 

203 return results 

204 

205 async def start_all(self) -> None: 

206 """Start all registered channel adapters.""" 

207 self._running = True 

208 

209 tasks = [adapter.start() for adapter in self._adapters.values()] 

210 await asyncio.gather(*tasks, return_exceptions=True) 

211 

212 # Log status 

213 for name, adapter in self._adapters.items(): 

214 status = adapter.get_status() 

215 if status == ChannelStatus.CONNECTED: 

216 logger.info(f"Channel {name} started successfully") 

217 else: 

218 logger.error(f"Channel {name} failed to start: {status}") 

219 

220 async def stop_all(self) -> None: 

221 """Stop all channel adapters.""" 

222 self._running = False 

223 

224 tasks = [adapter.stop() for adapter in self._adapters.values()] 

225 await asyncio.gather(*tasks, return_exceptions=True) 

226 

227 logger.info("All channels stopped") 

228 

229 def is_running(self) -> bool: 

230 """Check if registry is running.""" 

231 return self._running 

232 

233 

234# Global registry instance 

235_registry: Optional[ChannelRegistry] = None 

236 

237 

238def get_registry() -> ChannelRegistry: 

239 """Get or create the global channel registry.""" 

240 global _registry 

241 if _registry is None: 

242 _registry = ChannelRegistry() 

243 return _registry 

244 

245 

246def set_registry(registry: ChannelRegistry) -> None: 

247 """Set the global channel registry.""" 

248 global _registry 

249 _registry = registry