Coverage for integrations / channels / response / router.py: 75.7%

103 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Channel Response Router — Routes agent responses to originating and bound channels. 

3 

4Handles: 

51. Reply to the originating channel (where the message came from) 

62. Fan-out to user's other active channel bindings (preferred first) 

73. WAMP notification to desktop/web clients 

84. ConversationEntry logging for unified history 

9""" 

10 

11import asyncio 

12import json 

13import logging 

14from datetime import datetime 

15from typing import Optional, Dict, Any 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class ChannelResponseRouter: 

21 """Routes agent responses back through channels + WAMP + DB logging.""" 

22 

23 def __init__(self, registry=None): 

24 self._registry = registry 

25 self._db_session_factory = None 

26 

27 def _get_registry(self): 

28 if self._registry is None: 

29 from integrations.channels.registry import get_registry 

30 self._registry = get_registry() 

31 return self._registry 

32 

33 def _get_db(self): 

34 if self._db_session_factory is None: 

35 from integrations.social.models import get_db 

36 self._db_session_factory = get_db 

37 return self._db_session_factory() 

38 

39 def route_response( 

40 self, 

41 user_id, 

42 response_text: str, 

43 channel_context: Optional[Dict[str, Any]] = None, 

44 agent_id: Optional[str] = None, 

45 fan_out: bool = True, 

46 ): 

47 """ 

48 Route an agent response to all relevant destinations. 

49 

50 Args: 

51 user_id: The user who sent the original message 

52 response_text: Agent's response text 

53 channel_context: Originating channel info (channel, chat_id, sender_id, etc.) 

54 agent_id: Optional agent ID for conversation logging 

55 fan_out: Whether to send to other bound channels (not just originating) 

56 """ 

57 originating_channel = None 

58 originating_chat_id = None 

59 

60 if channel_context: 

61 originating_channel = channel_context.get('channel') 

62 originating_chat_id = channel_context.get('chat_id') 

63 

64 # 1. Log the assistant response 

65 self._log_conversation( 

66 user_id=user_id, 

67 channel_type=originating_channel or 'system', 

68 role='assistant', 

69 content=response_text, 

70 agent_id=agent_id, 

71 ) 

72 

73 # 2. Fan-out to bound channels (async, fire-and-forget) 

74 if fan_out: 

75 self._async_fan_out( 

76 user_id=user_id, 

77 text=response_text, 

78 exclude_channel=originating_channel, 

79 exclude_chat_id=originating_chat_id, 

80 ) 

81 

82 # 3. WAMP notification to desktop/web 

83 self._notify_desktop_wamp( 

84 user_id=user_id, 

85 text=response_text, 

86 channel_type=originating_channel, 

87 ) 

88 

89 def log_user_message( 

90 self, 

91 user_id, 

92 channel_type: str, 

93 content: str, 

94 agent_id: Optional[str] = None, 

95 ): 

96 """Log an incoming user message to ConversationEntry.""" 

97 self._log_conversation(user_id, channel_type, 'user', content, agent_id) 

98 

99 def upsert_binding( 

100 self, 

101 user_id, 

102 channel_type: str, 

103 sender_id: str, 

104 chat_id: Optional[str] = None, 

105 ): 

106 """Auto-upsert a UserChannelBinding on every incoming channel message.""" 

107 try: 

108 db = self._get_db() 

109 try: 

110 from integrations.social.models import UserChannelBinding 

111 existing = db.query(UserChannelBinding).filter_by( 

112 user_id=str(user_id), 

113 channel_type=channel_type, 

114 channel_sender_id=sender_id, 

115 ).first() 

116 

117 if existing: 

118 existing.last_message_at = datetime.utcnow() 

119 existing.is_active = True 

120 if chat_id: 

121 existing.channel_chat_id = chat_id 

122 else: 

123 binding = UserChannelBinding( 

124 user_id=str(user_id), 

125 channel_type=channel_type, 

126 channel_sender_id=sender_id, 

127 channel_chat_id=chat_id, 

128 is_active=True, 

129 is_preferred=False, 

130 ) 

131 db.add(binding) 

132 

133 db.commit() 

134 finally: 

135 db.close() 

136 except Exception as e: 

137 logger.warning("Failed to upsert channel binding: %s", e) 

138 

139 def _log_conversation(self, user_id, channel_type, role, content, agent_id=None): 

140 """Write a ConversationEntry row.""" 

141 try: 

142 db = self._get_db() 

143 try: 

144 from integrations.social.models import ConversationEntry 

145 entry = ConversationEntry( 

146 user_id=str(user_id), 

147 channel_type=channel_type, 

148 role=role, 

149 content=content[:10000], # cap at 10k chars 

150 agent_id=agent_id, 

151 ) 

152 db.add(entry) 

153 db.commit() 

154 finally: 

155 db.close() 

156 except Exception as e: 

157 logger.debug("Failed to log conversation entry: %s", e) 

158 

159 def _async_fan_out(self, user_id, text, exclude_channel=None, exclude_chat_id=None): 

160 """Fan-out response to all active bindings (fire-and-forget).""" 

161 try: 

162 db = self._get_db() 

163 try: 

164 from integrations.social.models import UserChannelBinding 

165 bindings = db.query(UserChannelBinding).filter_by( 

166 user_id=str(user_id), 

167 is_active=True, 

168 ).all() 

169 

170 # Sort: preferred first 

171 bindings.sort(key=lambda b: (not b.is_preferred, b.channel_type)) 

172 

173 registry = self._get_registry() 

174 loop = getattr(registry, '_loop', None) or _get_running_loop() 

175 

176 for binding in bindings: 

177 # Skip the originating channel to avoid double-send 

178 if (binding.channel_type == exclude_channel 

179 and binding.channel_chat_id == exclude_chat_id): 

180 continue 

181 if not binding.channel_chat_id: 

182 continue 

183 

184 # Schedule async send 

185 if loop and loop.is_running(): 

186 asyncio.run_coroutine_threadsafe( 

187 self._send_to_binding(registry, binding, text), 

188 loop, 

189 ) 

190 else: 

191 logger.debug("No event loop for fan-out to %s", binding.channel_type) 

192 finally: 

193 db.close() 

194 except Exception as e: 

195 logger.warning("Fan-out failed: %s", e) 

196 

197 @staticmethod 

198 async def _send_to_binding(registry, binding, text): 

199 """Send response to a single channel binding.""" 

200 try: 

201 result = await registry.send_to_channel( 

202 binding.channel_type, 

203 binding.channel_chat_id, 

204 text, 

205 ) 

206 if not result.success: 

207 logger.debug("Fan-out to %s failed: %s", binding.channel_type, result.error) 

208 except Exception as e: 

209 logger.debug("Fan-out to %s error: %s", binding.channel_type, e) 

210 

211 def _notify_desktop_wamp(self, user_id, text, channel_type=None): 

212 """Publish to WAMP for desktop/web notification. 

213 

214 Singleton accessor — see core.safe_hartos_attr for why workers 

215 must not eager-import hart_intelligence. 

216 """ 

217 try: 

218 from core.safe_hartos_attr import safe_hartos_attr 

219 publish_async = safe_hartos_attr('publish_async') 

220 if publish_async is None: 

221 logger.debug( 

222 "Channel response WAMP notify skipped: HARTOS " 

223 "publish_async unresolvable — user=%s channel=%s", 

224 user_id, channel_type, 

225 ) 

226 return 

227 notification = { 

228 "text": [text[:200]], 

229 "priority": 48, 

230 "action": "ChannelResponse", 

231 "channel": channel_type or "system", 

232 "historical_request_id": [], 

233 "options": [], 

234 "newoptions": [], 

235 } 

236 payload = json.dumps(notification) 

237 # Primary chat topic (existing desktop/web subscription) 

238 from core.peer_link.message_bus import chat_topic_for 

239 publish_async(chat_topic_for(user_id), payload) 

240 # Dedicated channel response topic (cross-device) 

241 publish_async( 

242 f'com.hertzai.hevolve.channel.response.{user_id}', 

243 payload, 

244 ) 

245 logger.debug( 

246 "Channel response WAMP notify published: user=%s channel=%s", 

247 user_id, channel_type, 

248 ) 

249 except Exception as e: 

250 logger.debug( 

251 "Channel response WAMP notify failed: user=%s err=%s", 

252 user_id, e, 

253 ) 

254 

255 

256def _get_running_loop(): 

257 """Try to get a running event loop.""" 

258 try: 

259 return asyncio.get_event_loop() 

260 except RuntimeError: 

261 return None 

262 

263 

264# Singleton 

265_router_instance = None 

266 

267 

268def get_response_router(registry=None) -> ChannelResponseRouter: 

269 """Get or create the singleton ChannelResponseRouter.""" 

270 global _router_instance 

271 if _router_instance is None: 

272 _router_instance = ChannelResponseRouter(registry=registry) 

273 return _router_instance