Coverage for integrations / channels / flask_integration.py: 54.3%

173 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Flask Integration for Channel Adapters 

3 

4Integrates the channel registry with the existing Flask API. 

5Routes incoming channel messages to the agent system. 

6""" 

7 

8import asyncio 

9import logging 

10import os 

11import json 

12import threading 

13from typing import Optional, Dict, Any 

14from functools import wraps 

15 

16import requests 

17from core.http_pool import pooled_post 

18 

19from .base import Message, ChannelConfig 

20from .registry import ChannelRegistry, ChannelRegistryConfig, get_registry 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25class FlaskChannelIntegration: 

26 """ 

27 Integrates channel adapters with the Flask-based agent API. 

28 

29 This bridges the async channel adapters with the sync Flask app. 

30 """ 

31 

32 def __init__( 

33 self, 

34 agent_api_url: str = None, 

35 default_user_id: int = None, 

36 default_prompt_id: int = None, 

37 create_mode: bool = False, 

38 device_id: str = None, 

39 ): 

40 from core.constants import DEFAULT_USER_ID, DEFAULT_PROMPT_ID 

41 if default_user_id is None: 

42 default_user_id = DEFAULT_USER_ID 

43 if default_prompt_id is None: 

44 default_prompt_id = DEFAULT_PROMPT_ID 

45 if agent_api_url is None: 

46 from core.port_registry import get_port 

47 agent_api_url = f"http://localhost:{get_port('backend')}/chat" 

48 self.agent_api_url = agent_api_url 

49 self.default_user_id = default_user_id 

50 self.default_prompt_id = default_prompt_id 

51 self.create_mode = create_mode 

52 self._device_id = device_id 

53 

54 self.registry = get_registry() 

55 self.registry.set_agent_handler(self._handle_message) 

56 

57 self._loop: Optional[asyncio.AbstractEventLoop] = None 

58 self._thread: Optional[threading.Thread] = None 

59 

60 # Persistent session manager (LRU cache + JSON persistence + 24h cleanup) 

61 from .session_manager import get_session_manager 

62 self._session_manager = get_session_manager() 

63 

64 # Response router for fan-out, conversation logging, WAMP 

65 from .response.router import get_response_router 

66 self._response_router = get_response_router(registry=self.registry) 

67 

68 # Self-chat handler — owner messaging their own WhatsApp number 

69 # becomes a private notebook-to-agent flow (persist + dispatch + 

70 # reply-in-thread, no fan-out). Feature gate on the adapter 

71 # config: extra.enable_self_chat_agent (default True). 

72 from .self_chat import SelfChatHandler 

73 self._self_chat = SelfChatHandler( 

74 agent_api_url=self.agent_api_url, 

75 owner_user_id=self.default_user_id, 

76 owner_prompt_id=self.default_prompt_id, 

77 device_id=self._device_id, 

78 session_manager=self._session_manager, 

79 response_router=self._response_router, 

80 registry=self.registry, 

81 get_loop=lambda: self._loop, 

82 ) 

83 

84 def _handle_message(self, message: Message) -> str: 

85 """ 

86 Handle incoming message from any channel. 

87 

88 Routes to Flask API and returns response. Resolves the 

89 Hevolve user_id via UserChannelBinding first — the user 

90 registered this channel (e.g. WhatsApp +1234) to their 

91 Hevolve account via Connect_Channel, and the binding row 

92 is the single source of truth for (channel, sender_id) → 

93 user_id. Falls back to the session cache and finally the 

94 configured default. 

95 """ 

96 try: 

97 # Get or create persistent session (replaces plain dict) 

98 session = self._session_manager.get_session( 

99 message.channel, message.sender_id 

100 ) 

101 

102 # ── Self-chat short-circuit ────────────────────────── 

103 # Owner messaging their own number → private notebook-to- 

104 # agent flow (persist + dispatch + reply-in-thread, no 

105 # fan-out). Gated per-adapter via extra.enable_self_chat_agent. 

106 if self._self_chat.is_self_message(message): 

107 logger.debug("self-chat from %s", message.sender_id) 

108 return self._self_chat.handle(message, session) 

109 

110 # ── Resolve user_id ─────────────────────────────────── 

111 # 1. UserChannelBinding (durable DB row written by 

112 # Connect_Channel tool + response router) 

113 # 2. Session cache (in-memory per (channel, sender_id)) 

114 # 3. Configured default 

115 # Without step 1, a WhatsApp user who bound their 

116 # account via Connect_Channel would still hit the chat 

117 # as user_id=10077 (default) and lose access to their 

118 # per-user memory / bindings / tool permissions. 

119 user_id = self._resolve_user_id_for_sender( 

120 channel=message.channel, 

121 sender_id=message.sender_id, 

122 fallback=(session.user_id if session and session.user_id 

123 else self.default_user_id), 

124 ) 

125 # prompt_id priority: session (user override) > per-channel config > global default 

126 prompt_id = ( 

127 (session.prompt_id if session and session.prompt_id else None) 

128 or self._get_channel_prompt_id(message.channel) 

129 or self.default_prompt_id 

130 ) 

131 

132 # Track message in session history 

133 if session: 

134 session.add_message('user', message.content) 

135 

136 # Skip if group and bot not mentioned (configurable) 

137 adapter = self.registry.get(message.channel) 

138 if adapter and message.is_group and not message.is_bot_mentioned: 

139 if adapter.config.require_mention_in_groups: 

140 logger.debug(f"Ignoring group message without mention") 

141 return None 

142 

143 # Prepare request to agent API 

144 payload = { 

145 "user_id": user_id, 

146 "prompt_id": prompt_id, 

147 "prompt": message.content, 

148 "create_agent": self.create_mode, 

149 "device_id": self._device_id, 

150 "channel_context": { 

151 "channel": message.channel, 

152 "sender_id": message.sender_id, 

153 "sender_name": message.sender_name, 

154 "chat_id": message.chat_id, 

155 "is_group": message.is_group, 

156 "message_id": message.id, 

157 } 

158 } 

159 

160 logger.info(f"Routing message from {message.channel}:{message.sender_id} to agent") 

161 

162 # Call agent API 

163 response = pooled_post( 

164 self.agent_api_url, 

165 json=payload, 

166 timeout=120, # 2 minute timeout for agent processing 

167 ) 

168 

169 if response.status_code == 200: 

170 result = response.json() 

171 agent_reply = result.get("response", "I processed your request.") 

172 

173 # Track response in session history 

174 if session: 

175 session.add_message('assistant', agent_reply) 

176 

177 # Auto-upsert channel binding + log user message 

178 self._response_router.upsert_binding( 

179 user_id, message.channel, message.sender_id, message.chat_id) 

180 self._response_router.log_user_message( 

181 user_id, message.channel, message.content) 

182 

183 # Route response: WAMP desktop + fan-out to bound channels + log 

184 self._response_router.route_response( 

185 user_id=user_id, 

186 response_text=agent_reply, 

187 channel_context=payload.get('channel_context'), 

188 fan_out=True, 

189 ) 

190 

191 return agent_reply 

192 else: 

193 logger.error(f"Agent API error: {response.status_code} - {response.text}") 

194 return "Sorry, I encountered an error processing your request." 

195 

196 except requests.Timeout: 

197 logger.error("Agent API timeout") 

198 return "Sorry, the request timed out. Please try again." 

199 except Exception as e: 

200 logger.error(f"Error handling message: {e}") 

201 return "Sorry, an unexpected error occurred." 

202 

203 def _resolve_user_id_for_sender( 

204 self, channel: str, sender_id: str, fallback, 

205 ): 

206 """Resolve (channel_type, channel_sender_id) → Hevolve user_id 

207 via the UserChannelBinding table. 

208 

209 Returns the bound user_id when the user has registered this 

210 channel via the Connect_Channel tool, otherwise the provided 

211 fallback (session cache or default). The lookup must never 

212 raise — binding DB failures log at debug and fall through so 

213 message handling is never blocked by a transient DB issue. 

214 """ 

215 if not channel or not sender_id: 

216 return fallback 

217 try: 

218 from integrations.social.models import get_db, UserChannelBinding 

219 except ImportError: 

220 return fallback 

221 try: 

222 db = get_db() 

223 try: 

224 row = db.query(UserChannelBinding).filter_by( 

225 channel_type=str(channel).lower(), 

226 channel_sender_id=str(sender_id), 

227 is_active=True, 

228 ).first() 

229 if row and row.user_id: 

230 logger.debug( 

231 f"Channel binding resolved: {channel}:{sender_id} " 

232 f"→ user_id={row.user_id}" 

233 ) 

234 return row.user_id 

235 finally: 

236 try: 

237 db.close() 

238 except Exception: 

239 pass 

240 except Exception as e: 

241 logger.debug( 

242 f"UserChannelBinding lookup failed " 

243 f"({channel}:{sender_id}): {e}" 

244 ) 

245 return fallback 

246 

247 def _get_channel_prompt_id(self, channel_type: str) -> Optional[int]: 

248 """Read per-channel prompt_id from admin config (if set).""" 

249 try: 

250 from .admin.api import get_api 

251 api = get_api() 

252 config = api._channels.get(channel_type, {}) 

253 pid = config.get('prompt_id') 

254 return int(pid) if pid else None 

255 except Exception: 

256 return None 

257 

258 # ── Adapter factory import paths ───────────────────────────── 

259 # Maps channel_type → (module_path, factory_function_name). 

260 # Core adapters live in integrations.channels, extensions in 

261 # integrations.channels.extensions, hardware in .hardware. 

262 _ADAPTER_FACTORIES: Dict[str, tuple] = { 

263 'telegram': ('.telegram_adapter', 'create_telegram_adapter'), 

264 'discord': ('.discord_adapter', 'create_discord_adapter'), 

265 'whatsapp': ('.whatsapp_adapter', 'create_whatsapp_adapter'), 

266 'slack': ('.slack_adapter', 'create_slack_adapter'), 

267 'signal': ('.signal_adapter', 'create_signal_adapter'), 

268 'imessage': ('.imessage_adapter', 'create_imessage_adapter'), 

269 'google_chat': ('.google_chat_adapter', 'create_google_chat_adapter'), 

270 'web': ('.web_adapter', 'create_web_adapter'), 

271 # Extensions 

272 'teams': ('.extensions.teams_adapter', 'create_teams_adapter'), 

273 'matrix': ('.extensions.matrix_adapter', 'create_matrix_adapter'), 

274 'mattermost': ('.extensions.mattermost_adapter', 'create_mattermost_adapter'), 

275 'nextcloud': ('.extensions.nextcloud_adapter', 'create_nextcloud_adapter'), 

276 'rocketchat': ('.extensions.rocketchat_adapter', 'create_rocketchat_adapter'), 

277 'messenger': ('.extensions.messenger_adapter', 'create_messenger_adapter'), 

278 'instagram': ('.extensions.instagram_adapter', 'create_instagram_adapter'), 

279 'twitter': ('.extensions.twitter_adapter', 'create_twitter_adapter'), 

280 'line': ('.extensions.line_adapter', 'create_line_adapter'), 

281 'viber': ('.extensions.viber_adapter', 'create_viber_adapter'), 

282 'wechat': ('.extensions.wechat_adapter', 'create_wechat_adapter'), 

283 'zalo': ('.extensions.zalo_adapter', 'create_zalo_adapter'), 

284 'twitch': ('.extensions.twitch_adapter', 'create_twitch_adapter'), 

285 'nostr': ('.extensions.nostr_adapter', 'create_nostr_adapter'), 

286 'tlon': ('.extensions.tlon_adapter', 'create_tlon_adapter'), 

287 'openprose': ('.extensions.openprose_adapter', 'create_openprose_adapter'), 

288 'telegram_user': ('.extensions.telegram_user_adapter', 'create_telegram_user_adapter'), 

289 'discord_user': ('.extensions.discord_user_adapter', 'create_discord_user_adapter'), 

290 'zalo_user': ('.extensions.zalo_user_adapter', 'create_zalo_user_adapter'), 

291 'bluebubbles': ('.extensions.bluebubbles_adapter', 'create_bluebubbles_adapter'), 

292 'email': ('.extensions.email_adapter', 'create_email_adapter'), 

293 'voice': ('.extensions.voice_adapter', 'create_voice_adapter'), 

294 } 

295 

296 # Env var fallbacks for token/credential per channel type 

297 _ENV_FALLBACKS: Dict[str, str] = { 

298 'telegram': 'TELEGRAM_BOT_TOKEN', 

299 'discord': 'DISCORD_BOT_TOKEN', 

300 'whatsapp': 'WHATSAPP_API_URL', 

301 'slack': 'SLACK_BOT_TOKEN', 

302 'signal': 'SIGNAL_CLI_URL', 

303 'teams': 'TEAMS_BOT_TOKEN', 

304 } 

305 

306 def register_channel(self, channel_type: str, token: str = None, **kwargs) -> bool: 

307 """Register any channel adapter by type. 

308 

309 Generic factory — replaces per-channel register_* methods. 

310 Falls back to env var if no token provided. Returns True on success. 

311 """ 

312 factory_info = self._ADAPTER_FACTORIES.get(channel_type) 

313 if not factory_info: 

314 logger.warning(f"Unknown channel type: {channel_type}") 

315 return False 

316 

317 module_path, factory_name = factory_info 

318 token = token or os.getenv(self._ENV_FALLBACKS.get(channel_type, '')) 

319 if not token and channel_type not in ('web', 'imessage', 'openprose'): 

320 # web/imessage/openprose don't need external tokens 

321 logger.warning(f"{channel_type} token not provided, skipping") 

322 return False 

323 

324 try: 

325 import importlib 

326 mod = importlib.import_module(module_path, package='integrations.channels') 

327 factory_fn = getattr(mod, factory_name) 

328 if token: 

329 adapter = factory_fn(token=token, **kwargs) 

330 else: 

331 adapter = factory_fn(**kwargs) 

332 self.registry.register(adapter) 

333 logger.info(f"{channel_type} adapter registered") 

334 return True 

335 except Exception as e: 

336 logger.warning(f"{channel_type} adapter registration failed: {e}") 

337 return False 

338 

339 # Keep legacy methods as thin delegates for backward compat 

340 def register_telegram(self, token: str = None, **kwargs) -> None: 

341 self.register_channel('telegram', token=token, **kwargs) 

342 

343 def register_discord(self, token: str = None, **kwargs) -> None: 

344 self.register_channel('discord', token=token, **kwargs) 

345 

346 def register_whatsapp(self, api_url: str = None, **kwargs) -> None: 

347 self.register_channel('whatsapp', token=api_url, **kwargs) 

348 

349 def set_user_session( 

350 self, 

351 channel: str, 

352 sender_id: str, 

353 user_id: int, 

354 prompt_id: int, 

355 ) -> None: 

356 """Set user session mapping for a channel sender.""" 

357 session = self._session_manager.get_session(channel, sender_id, user_id=user_id, prompt_id=prompt_id) 

358 

359 def _run_async_loop(self) -> None: 

360 """Run asyncio event loop in background thread.""" 

361 self._loop = asyncio.new_event_loop() 

362 asyncio.set_event_loop(self._loop) 

363 

364 try: 

365 self._loop.run_until_complete(self.registry.start_all()) 

366 self._loop.run_forever() 

367 finally: 

368 self._loop.run_until_complete(self.registry.stop_all()) 

369 self._loop.close() 

370 

371 def start(self) -> None: 

372 """Start all channel adapters in background thread.""" 

373 if self._thread and self._thread.is_alive(): 

374 logger.warning("Channels already running") 

375 return 

376 

377 self._thread = threading.Thread(target=self._run_async_loop, daemon=True) 

378 self._thread.start() 

379 logger.info("Channel adapters started in background") 

380 

381 def stop(self) -> None: 

382 """Stop all channel adapters.""" 

383 if self._loop: 

384 self._loop.call_soon_threadsafe(self._loop.stop) 

385 

386 if self._thread: 

387 self._thread.join(timeout=5) 

388 

389 logger.info("Channel adapters stopped") 

390 

391 def get_status(self) -> Dict[str, str]: 

392 """Get status of all channels.""" 

393 return { 

394 name: status.value 

395 for name, status in self.registry.get_status().items() 

396 } 

397 

398 

399# Global integration instance 

400_integration: Optional[FlaskChannelIntegration] = None 

401 

402 

403def get_channel_integration() -> FlaskChannelIntegration: 

404 """Get or create the global channel integration.""" 

405 global _integration 

406 if _integration is None: 

407 _integration = FlaskChannelIntegration() 

408 return _integration 

409 

410 

411def init_channels(app=None, config: Dict[str, Any] = None) -> FlaskChannelIntegration: 

412 """ 

413 Initialize channel integrations. 

414 

415 Call this from your Flask app startup: 

416 

417 from integrations.channels.flask_integration import init_channels 

418 

419 app = Flask(__name__) 

420 channels = init_channels(app) 

421 channels.register_telegram() 

422 channels.start() 

423 

424 Args: 

425 app: Flask app instance (optional) 

426 config: Configuration dict (optional) 

427 

428 Returns: 

429 FlaskChannelIntegration instance 

430 """ 

431 config = config or {} 

432 from core.constants import DEFAULT_USER_ID, DEFAULT_PROMPT_ID 

433 

434 integration = FlaskChannelIntegration( 

435 agent_api_url=config.get("agent_api_url", "http://localhost:6777/chat"), 

436 default_user_id=config.get("default_user_id", DEFAULT_USER_ID), 

437 default_prompt_id=config.get("default_prompt_id", DEFAULT_PROMPT_ID), 

438 create_mode=config.get("create_mode", False), 

439 device_id=config.get("device_id"), 

440 ) 

441 

442 global _integration 

443 _integration = integration 

444 

445 # Add Flask routes if app provided 

446 if app: 

447 @app.route("/channels/status", methods=["GET"]) 

448 def channel_status(): 

449 return integration.get_status() 

450 

451 @app.route("/channels/send", methods=["POST"]) 

452 def channel_send(): 

453 from flask import request, jsonify 

454 

455 data = request.json 

456 channel = data.get("channel") 

457 chat_id = data.get("chat_id") 

458 text = data.get("text") 

459 

460 if not all([channel, chat_id, text]): 

461 return jsonify({"error": "Missing required fields"}), 400 

462 

463 # Run async send in the event loop 

464 if integration._loop: 

465 future = asyncio.run_coroutine_threadsafe( 

466 integration.registry.send_to_channel(channel, chat_id, text), 

467 integration._loop, 

468 ) 

469 result = future.result(timeout=30) 

470 return jsonify({ 

471 "success": result.success, 

472 "message_id": result.message_id, 

473 "error": result.error, 

474 }) 

475 else: 

476 return jsonify({"error": "Channels not running"}), 503 

477 

478 return integration