Coverage for integrations / channels / registry.py: 54.5%
99 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Channel Registry
4Manages all channel adapters and provides unified access.
5Handles routing messages to/from the agent system.
6"""
8import asyncio
9import logging
10from typing import Dict, Optional, Callable, Any, List
11from dataclasses import dataclass, field
12from core.port_registry import get_port
14from .base import (
15 ChannelAdapter,
16 ChannelConfig,
17 ChannelStatus,
18 Message,
19 SendResult,
20 MediaAttachment,
21)
23logger = logging.getLogger(__name__)
26@dataclass
27class ChannelRegistryConfig:
28 """Configuration for channel registry."""
29 from core.constants import DEFAULT_USER_ID, DEFAULT_PROMPT_ID
30 agent_callback_url: str = None
31 default_user_id: int = DEFAULT_USER_ID
32 default_prompt_id: int = DEFAULT_PROMPT_ID
33 enable_create_mode: bool = False
35 def __post_init__(self):
36 if self.agent_callback_url is None:
37 self.agent_callback_url = f"http://localhost:{get_port('backend')}/chat"
40class ChannelRegistry:
41 """
42 Central registry for all messaging channel adapters.
44 Provides:
45 - Unified message routing
46 - Channel lifecycle management
47 - Agent integration
48 """
50 def __init__(self, config: ChannelRegistryConfig = None):
51 self.config = config or ChannelRegistryConfig()
52 self._adapters: Dict[str, ChannelAdapter] = {}
53 self._agent_handler: Optional[Callable] = None
54 self._running = False
56 def register(self, adapter: ChannelAdapter) -> None:
57 """
58 Register a channel adapter.
60 Args:
61 adapter: Channel adapter instance
62 """
63 if adapter.name in self._adapters:
64 logger.warning(f"Replacing existing adapter for {adapter.name}")
66 self._adapters[adapter.name] = adapter
68 # Set up message routing
69 adapter.on_message(self._route_to_agent)
71 logger.info(f"Registered channel adapter: {adapter.name}")
73 def unregister(self, channel_name: str) -> None:
74 """Unregister a channel adapter."""
75 if channel_name in self._adapters:
76 del self._adapters[channel_name]
77 logger.info(f"Unregistered channel adapter: {channel_name}")
79 def get(self, channel_name: str) -> Optional[ChannelAdapter]:
80 """Get adapter by name."""
81 return self._adapters.get(channel_name)
83 def list_channels(self) -> List[str]:
84 """List all registered channel names."""
85 return list(self._adapters.keys())
87 def get_status(self) -> Dict[str, ChannelStatus]:
88 """Get status of all channels."""
89 return {name: adapter.get_status() for name, adapter in self._adapters.items()}
91 def set_agent_handler(self, handler: Callable[[Message], str]) -> None:
92 """
93 Set the agent handler function.
95 This function receives messages and returns agent responses.
97 Args:
98 handler: Async function (Message) -> str (response)
99 """
100 self._agent_handler = handler
102 async def _route_to_agent(self, message: Message) -> None:
103 """
104 Route incoming message to agent and send response.
106 This is the core integration point between channels and the agent system.
107 """
108 if not self._agent_handler:
109 logger.warning("No agent handler set, ignoring message")
110 return
112 adapter = self._adapters.get(message.channel)
113 if not adapter:
114 logger.error(f"No adapter found for channel: {message.channel}")
115 return
117 try:
118 # Send typing indicator
119 await adapter.send_typing(message.chat_id)
121 # Get response from agent
122 response = self._agent_handler(message)
123 if asyncio.iscoroutine(response):
124 response = await response
126 if response:
127 # Send response back to channel
128 await adapter.send_message(
129 chat_id=message.chat_id,
130 text=response,
131 reply_to=message.id,
132 )
134 except Exception as e:
135 logger.error(f"Error routing message to agent: {e}")
136 # Optionally send error message to user
137 try:
138 await adapter.send_message(
139 chat_id=message.chat_id,
140 text="Sorry, I encountered an error processing your message.",
141 reply_to=message.id,
142 )
143 except Exception:
144 pass
146 async def send_to_channel(
147 self,
148 channel: str,
149 chat_id: str,
150 text: str,
151 **kwargs
152 ) -> SendResult:
153 """
154 Send a message to a specific channel.
156 Args:
157 channel: Channel name (telegram, discord, etc.)
158 chat_id: Target chat ID
159 text: Message text
160 **kwargs: Additional arguments (media, buttons, etc.)
161 """
162 adapter = self._adapters.get(channel)
163 if not adapter:
164 return SendResult(success=False, error=f"Unknown channel: {channel}")
166 if not adapter.is_running():
167 return SendResult(success=False, error=f"Channel {channel} not connected")
169 return await adapter.send_message(chat_id, text, **kwargs)
171 async def broadcast(
172 self,
173 text: str,
174 channels: Optional[List[str]] = None,
175 chat_ids: Optional[Dict[str, str]] = None,
176 ) -> Dict[str, SendResult]:
177 """
178 Broadcast message to multiple channels.
180 Args:
181 text: Message text
182 channels: List of channels to broadcast to (all if None)
183 chat_ids: Dict of channel -> chat_id mappings
185 Returns:
186 Dict of channel -> SendResult
187 """
188 results = {}
189 target_channels = channels or list(self._adapters.keys())
191 for channel in target_channels:
192 if channel not in self._adapters:
193 results[channel] = SendResult(success=False, error="Unknown channel")
194 continue
196 chat_id = chat_ids.get(channel) if chat_ids else None
197 if not chat_id:
198 results[channel] = SendResult(success=False, error="No chat_id for channel")
199 continue
201 results[channel] = await self.send_to_channel(channel, chat_id, text)
203 return results
205 async def start_all(self) -> None:
206 """Start all registered channel adapters."""
207 self._running = True
209 tasks = [adapter.start() for adapter in self._adapters.values()]
210 await asyncio.gather(*tasks, return_exceptions=True)
212 # Log status
213 for name, adapter in self._adapters.items():
214 status = adapter.get_status()
215 if status == ChannelStatus.CONNECTED:
216 logger.info(f"Channel {name} started successfully")
217 else:
218 logger.error(f"Channel {name} failed to start: {status}")
220 async def stop_all(self) -> None:
221 """Stop all channel adapters."""
222 self._running = False
224 tasks = [adapter.stop() for adapter in self._adapters.values()]
225 await asyncio.gather(*tasks, return_exceptions=True)
227 logger.info("All channels stopped")
229 def is_running(self) -> bool:
230 """Check if registry is running."""
231 return self._running
234# Global registry instance
235_registry: Optional[ChannelRegistry] = None
238def get_registry() -> ChannelRegistry:
239 """Get or create the global channel registry."""
240 global _registry
241 if _registry is None:
242 _registry = ChannelRegistry()
243 return _registry
246def set_registry(registry: ChannelRegistry) -> None:
247 """Set the global channel registry."""
248 global _registry
249 _registry = registry