Coverage for integrations / channels / response / router.py: 75.7%
103 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Channel Response Router — Routes agent responses to originating and bound channels.
4Handles:
51. Reply to the originating channel (where the message came from)
62. Fan-out to user's other active channel bindings (preferred first)
73. WAMP notification to desktop/web clients
84. ConversationEntry logging for unified history
9"""
11import asyncio
12import json
13import logging
14from datetime import datetime
15from typing import Optional, Dict, Any
17logger = logging.getLogger(__name__)
20class ChannelResponseRouter:
21 """Routes agent responses back through channels + WAMP + DB logging."""
23 def __init__(self, registry=None):
24 self._registry = registry
25 self._db_session_factory = None
27 def _get_registry(self):
28 if self._registry is None:
29 from integrations.channels.registry import get_registry
30 self._registry = get_registry()
31 return self._registry
33 def _get_db(self):
34 if self._db_session_factory is None:
35 from integrations.social.models import get_db
36 self._db_session_factory = get_db
37 return self._db_session_factory()
39 def route_response(
40 self,
41 user_id,
42 response_text: str,
43 channel_context: Optional[Dict[str, Any]] = None,
44 agent_id: Optional[str] = None,
45 fan_out: bool = True,
46 ):
47 """
48 Route an agent response to all relevant destinations.
50 Args:
51 user_id: The user who sent the original message
52 response_text: Agent's response text
53 channel_context: Originating channel info (channel, chat_id, sender_id, etc.)
54 agent_id: Optional agent ID for conversation logging
55 fan_out: Whether to send to other bound channels (not just originating)
56 """
57 originating_channel = None
58 originating_chat_id = None
60 if channel_context:
61 originating_channel = channel_context.get('channel')
62 originating_chat_id = channel_context.get('chat_id')
64 # 1. Log the assistant response
65 self._log_conversation(
66 user_id=user_id,
67 channel_type=originating_channel or 'system',
68 role='assistant',
69 content=response_text,
70 agent_id=agent_id,
71 )
73 # 2. Fan-out to bound channels (async, fire-and-forget)
74 if fan_out:
75 self._async_fan_out(
76 user_id=user_id,
77 text=response_text,
78 exclude_channel=originating_channel,
79 exclude_chat_id=originating_chat_id,
80 )
82 # 3. WAMP notification to desktop/web
83 self._notify_desktop_wamp(
84 user_id=user_id,
85 text=response_text,
86 channel_type=originating_channel,
87 )
89 def log_user_message(
90 self,
91 user_id,
92 channel_type: str,
93 content: str,
94 agent_id: Optional[str] = None,
95 ):
96 """Log an incoming user message to ConversationEntry."""
97 self._log_conversation(user_id, channel_type, 'user', content, agent_id)
99 def upsert_binding(
100 self,
101 user_id,
102 channel_type: str,
103 sender_id: str,
104 chat_id: Optional[str] = None,
105 ):
106 """Auto-upsert a UserChannelBinding on every incoming channel message."""
107 try:
108 db = self._get_db()
109 try:
110 from integrations.social.models import UserChannelBinding
111 existing = db.query(UserChannelBinding).filter_by(
112 user_id=str(user_id),
113 channel_type=channel_type,
114 channel_sender_id=sender_id,
115 ).first()
117 if existing:
118 existing.last_message_at = datetime.utcnow()
119 existing.is_active = True
120 if chat_id:
121 existing.channel_chat_id = chat_id
122 else:
123 binding = UserChannelBinding(
124 user_id=str(user_id),
125 channel_type=channel_type,
126 channel_sender_id=sender_id,
127 channel_chat_id=chat_id,
128 is_active=True,
129 is_preferred=False,
130 )
131 db.add(binding)
133 db.commit()
134 finally:
135 db.close()
136 except Exception as e:
137 logger.warning("Failed to upsert channel binding: %s", e)
139 def _log_conversation(self, user_id, channel_type, role, content, agent_id=None):
140 """Write a ConversationEntry row."""
141 try:
142 db = self._get_db()
143 try:
144 from integrations.social.models import ConversationEntry
145 entry = ConversationEntry(
146 user_id=str(user_id),
147 channel_type=channel_type,
148 role=role,
149 content=content[:10000], # cap at 10k chars
150 agent_id=agent_id,
151 )
152 db.add(entry)
153 db.commit()
154 finally:
155 db.close()
156 except Exception as e:
157 logger.debug("Failed to log conversation entry: %s", e)
159 def _async_fan_out(self, user_id, text, exclude_channel=None, exclude_chat_id=None):
160 """Fan-out response to all active bindings (fire-and-forget)."""
161 try:
162 db = self._get_db()
163 try:
164 from integrations.social.models import UserChannelBinding
165 bindings = db.query(UserChannelBinding).filter_by(
166 user_id=str(user_id),
167 is_active=True,
168 ).all()
170 # Sort: preferred first
171 bindings.sort(key=lambda b: (not b.is_preferred, b.channel_type))
173 registry = self._get_registry()
174 loop = getattr(registry, '_loop', None) or _get_running_loop()
176 for binding in bindings:
177 # Skip the originating channel to avoid double-send
178 if (binding.channel_type == exclude_channel
179 and binding.channel_chat_id == exclude_chat_id):
180 continue
181 if not binding.channel_chat_id:
182 continue
184 # Schedule async send
185 if loop and loop.is_running():
186 asyncio.run_coroutine_threadsafe(
187 self._send_to_binding(registry, binding, text),
188 loop,
189 )
190 else:
191 logger.debug("No event loop for fan-out to %s", binding.channel_type)
192 finally:
193 db.close()
194 except Exception as e:
195 logger.warning("Fan-out failed: %s", e)
197 @staticmethod
198 async def _send_to_binding(registry, binding, text):
199 """Send response to a single channel binding."""
200 try:
201 result = await registry.send_to_channel(
202 binding.channel_type,
203 binding.channel_chat_id,
204 text,
205 )
206 if not result.success:
207 logger.debug("Fan-out to %s failed: %s", binding.channel_type, result.error)
208 except Exception as e:
209 logger.debug("Fan-out to %s error: %s", binding.channel_type, e)
211 def _notify_desktop_wamp(self, user_id, text, channel_type=None):
212 """Publish to WAMP for desktop/web notification.
214 Singleton accessor — see core.safe_hartos_attr for why workers
215 must not eager-import hart_intelligence.
216 """
217 try:
218 from core.safe_hartos_attr import safe_hartos_attr
219 publish_async = safe_hartos_attr('publish_async')
220 if publish_async is None:
221 logger.debug(
222 "Channel response WAMP notify skipped: HARTOS "
223 "publish_async unresolvable — user=%s channel=%s",
224 user_id, channel_type,
225 )
226 return
227 notification = {
228 "text": [text[:200]],
229 "priority": 48,
230 "action": "ChannelResponse",
231 "channel": channel_type or "system",
232 "historical_request_id": [],
233 "options": [],
234 "newoptions": [],
235 }
236 payload = json.dumps(notification)
237 # Primary chat topic (existing desktop/web subscription)
238 from core.peer_link.message_bus import chat_topic_for
239 publish_async(chat_topic_for(user_id), payload)
240 # Dedicated channel response topic (cross-device)
241 publish_async(
242 f'com.hertzai.hevolve.channel.response.{user_id}',
243 payload,
244 )
245 logger.debug(
246 "Channel response WAMP notify published: user=%s channel=%s",
247 user_id, channel_type,
248 )
249 except Exception as e:
250 logger.debug(
251 "Channel response WAMP notify failed: user=%s err=%s",
252 user_id, e,
253 )
256def _get_running_loop():
257 """Try to get a running event loop."""
258 try:
259 return asyncio.get_event_loop()
260 except RuntimeError:
261 return None
264# Singleton
265_router_instance = None
268def get_response_router(registry=None) -> ChannelResponseRouter:
269 """Get or create the singleton ChannelResponseRouter."""
270 global _router_instance
271 if _router_instance is None:
272 _router_instance = ChannelResponseRouter(registry=registry)
273 return _router_instance