Coverage for integrations / channels / flask_integration.py: 54.3%
173 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Flask Integration for Channel Adapters
4Integrates the channel registry with the existing Flask API.
5Routes incoming channel messages to the agent system.
6"""
8import asyncio
9import logging
10import os
11import json
12import threading
13from typing import Optional, Dict, Any
14from functools import wraps
16import requests
17from core.http_pool import pooled_post
19from .base import Message, ChannelConfig
20from .registry import ChannelRegistry, ChannelRegistryConfig, get_registry
22logger = logging.getLogger(__name__)
25class FlaskChannelIntegration:
26 """
27 Integrates channel adapters with the Flask-based agent API.
29 This bridges the async channel adapters with the sync Flask app.
30 """
32 def __init__(
33 self,
34 agent_api_url: str = None,
35 default_user_id: int = None,
36 default_prompt_id: int = None,
37 create_mode: bool = False,
38 device_id: str = None,
39 ):
40 from core.constants import DEFAULT_USER_ID, DEFAULT_PROMPT_ID
41 if default_user_id is None:
42 default_user_id = DEFAULT_USER_ID
43 if default_prompt_id is None:
44 default_prompt_id = DEFAULT_PROMPT_ID
45 if agent_api_url is None:
46 from core.port_registry import get_port
47 agent_api_url = f"http://localhost:{get_port('backend')}/chat"
48 self.agent_api_url = agent_api_url
49 self.default_user_id = default_user_id
50 self.default_prompt_id = default_prompt_id
51 self.create_mode = create_mode
52 self._device_id = device_id
54 self.registry = get_registry()
55 self.registry.set_agent_handler(self._handle_message)
57 self._loop: Optional[asyncio.AbstractEventLoop] = None
58 self._thread: Optional[threading.Thread] = None
60 # Persistent session manager (LRU cache + JSON persistence + 24h cleanup)
61 from .session_manager import get_session_manager
62 self._session_manager = get_session_manager()
64 # Response router for fan-out, conversation logging, WAMP
65 from .response.router import get_response_router
66 self._response_router = get_response_router(registry=self.registry)
68 # Self-chat handler — owner messaging their own WhatsApp number
69 # becomes a private notebook-to-agent flow (persist + dispatch +
70 # reply-in-thread, no fan-out). Feature gate on the adapter
71 # config: extra.enable_self_chat_agent (default True).
72 from .self_chat import SelfChatHandler
73 self._self_chat = SelfChatHandler(
74 agent_api_url=self.agent_api_url,
75 owner_user_id=self.default_user_id,
76 owner_prompt_id=self.default_prompt_id,
77 device_id=self._device_id,
78 session_manager=self._session_manager,
79 response_router=self._response_router,
80 registry=self.registry,
81 get_loop=lambda: self._loop,
82 )
84 def _handle_message(self, message: Message) -> str:
85 """
86 Handle incoming message from any channel.
88 Routes to Flask API and returns response. Resolves the
89 Hevolve user_id via UserChannelBinding first — the user
90 registered this channel (e.g. WhatsApp +1234) to their
91 Hevolve account via Connect_Channel, and the binding row
92 is the single source of truth for (channel, sender_id) →
93 user_id. Falls back to the session cache and finally the
94 configured default.
95 """
96 try:
97 # Get or create persistent session (replaces plain dict)
98 session = self._session_manager.get_session(
99 message.channel, message.sender_id
100 )
102 # ── Self-chat short-circuit ──────────────────────────
103 # Owner messaging their own number → private notebook-to-
104 # agent flow (persist + dispatch + reply-in-thread, no
105 # fan-out). Gated per-adapter via extra.enable_self_chat_agent.
106 if self._self_chat.is_self_message(message):
107 logger.debug("self-chat from %s", message.sender_id)
108 return self._self_chat.handle(message, session)
110 # ── Resolve user_id ───────────────────────────────────
111 # 1. UserChannelBinding (durable DB row written by
112 # Connect_Channel tool + response router)
113 # 2. Session cache (in-memory per (channel, sender_id))
114 # 3. Configured default
115 # Without step 1, a WhatsApp user who bound their
116 # account via Connect_Channel would still hit the chat
117 # as user_id=10077 (default) and lose access to their
118 # per-user memory / bindings / tool permissions.
119 user_id = self._resolve_user_id_for_sender(
120 channel=message.channel,
121 sender_id=message.sender_id,
122 fallback=(session.user_id if session and session.user_id
123 else self.default_user_id),
124 )
125 # prompt_id priority: session (user override) > per-channel config > global default
126 prompt_id = (
127 (session.prompt_id if session and session.prompt_id else None)
128 or self._get_channel_prompt_id(message.channel)
129 or self.default_prompt_id
130 )
132 # Track message in session history
133 if session:
134 session.add_message('user', message.content)
136 # Skip if group and bot not mentioned (configurable)
137 adapter = self.registry.get(message.channel)
138 if adapter and message.is_group and not message.is_bot_mentioned:
139 if adapter.config.require_mention_in_groups:
140 logger.debug(f"Ignoring group message without mention")
141 return None
143 # Prepare request to agent API
144 payload = {
145 "user_id": user_id,
146 "prompt_id": prompt_id,
147 "prompt": message.content,
148 "create_agent": self.create_mode,
149 "device_id": self._device_id,
150 "channel_context": {
151 "channel": message.channel,
152 "sender_id": message.sender_id,
153 "sender_name": message.sender_name,
154 "chat_id": message.chat_id,
155 "is_group": message.is_group,
156 "message_id": message.id,
157 }
158 }
160 logger.info(f"Routing message from {message.channel}:{message.sender_id} to agent")
162 # Call agent API
163 response = pooled_post(
164 self.agent_api_url,
165 json=payload,
166 timeout=120, # 2 minute timeout for agent processing
167 )
169 if response.status_code == 200:
170 result = response.json()
171 agent_reply = result.get("response", "I processed your request.")
173 # Track response in session history
174 if session:
175 session.add_message('assistant', agent_reply)
177 # Auto-upsert channel binding + log user message
178 self._response_router.upsert_binding(
179 user_id, message.channel, message.sender_id, message.chat_id)
180 self._response_router.log_user_message(
181 user_id, message.channel, message.content)
183 # Route response: WAMP desktop + fan-out to bound channels + log
184 self._response_router.route_response(
185 user_id=user_id,
186 response_text=agent_reply,
187 channel_context=payload.get('channel_context'),
188 fan_out=True,
189 )
191 return agent_reply
192 else:
193 logger.error(f"Agent API error: {response.status_code} - {response.text}")
194 return "Sorry, I encountered an error processing your request."
196 except requests.Timeout:
197 logger.error("Agent API timeout")
198 return "Sorry, the request timed out. Please try again."
199 except Exception as e:
200 logger.error(f"Error handling message: {e}")
201 return "Sorry, an unexpected error occurred."
203 def _resolve_user_id_for_sender(
204 self, channel: str, sender_id: str, fallback,
205 ):
206 """Resolve (channel_type, channel_sender_id) → Hevolve user_id
207 via the UserChannelBinding table.
209 Returns the bound user_id when the user has registered this
210 channel via the Connect_Channel tool, otherwise the provided
211 fallback (session cache or default). The lookup must never
212 raise — binding DB failures log at debug and fall through so
213 message handling is never blocked by a transient DB issue.
214 """
215 if not channel or not sender_id:
216 return fallback
217 try:
218 from integrations.social.models import get_db, UserChannelBinding
219 except ImportError:
220 return fallback
221 try:
222 db = get_db()
223 try:
224 row = db.query(UserChannelBinding).filter_by(
225 channel_type=str(channel).lower(),
226 channel_sender_id=str(sender_id),
227 is_active=True,
228 ).first()
229 if row and row.user_id:
230 logger.debug(
231 f"Channel binding resolved: {channel}:{sender_id} "
232 f"→ user_id={row.user_id}"
233 )
234 return row.user_id
235 finally:
236 try:
237 db.close()
238 except Exception:
239 pass
240 except Exception as e:
241 logger.debug(
242 f"UserChannelBinding lookup failed "
243 f"({channel}:{sender_id}): {e}"
244 )
245 return fallback
247 def _get_channel_prompt_id(self, channel_type: str) -> Optional[int]:
248 """Read per-channel prompt_id from admin config (if set)."""
249 try:
250 from .admin.api import get_api
251 api = get_api()
252 config = api._channels.get(channel_type, {})
253 pid = config.get('prompt_id')
254 return int(pid) if pid else None
255 except Exception:
256 return None
258 # ── Adapter factory import paths ─────────────────────────────
259 # Maps channel_type → (module_path, factory_function_name).
260 # Core adapters live in integrations.channels, extensions in
261 # integrations.channels.extensions, hardware in .hardware.
262 _ADAPTER_FACTORIES: Dict[str, tuple] = {
263 'telegram': ('.telegram_adapter', 'create_telegram_adapter'),
264 'discord': ('.discord_adapter', 'create_discord_adapter'),
265 'whatsapp': ('.whatsapp_adapter', 'create_whatsapp_adapter'),
266 'slack': ('.slack_adapter', 'create_slack_adapter'),
267 'signal': ('.signal_adapter', 'create_signal_adapter'),
268 'imessage': ('.imessage_adapter', 'create_imessage_adapter'),
269 'google_chat': ('.google_chat_adapter', 'create_google_chat_adapter'),
270 'web': ('.web_adapter', 'create_web_adapter'),
271 # Extensions
272 'teams': ('.extensions.teams_adapter', 'create_teams_adapter'),
273 'matrix': ('.extensions.matrix_adapter', 'create_matrix_adapter'),
274 'mattermost': ('.extensions.mattermost_adapter', 'create_mattermost_adapter'),
275 'nextcloud': ('.extensions.nextcloud_adapter', 'create_nextcloud_adapter'),
276 'rocketchat': ('.extensions.rocketchat_adapter', 'create_rocketchat_adapter'),
277 'messenger': ('.extensions.messenger_adapter', 'create_messenger_adapter'),
278 'instagram': ('.extensions.instagram_adapter', 'create_instagram_adapter'),
279 'twitter': ('.extensions.twitter_adapter', 'create_twitter_adapter'),
280 'line': ('.extensions.line_adapter', 'create_line_adapter'),
281 'viber': ('.extensions.viber_adapter', 'create_viber_adapter'),
282 'wechat': ('.extensions.wechat_adapter', 'create_wechat_adapter'),
283 'zalo': ('.extensions.zalo_adapter', 'create_zalo_adapter'),
284 'twitch': ('.extensions.twitch_adapter', 'create_twitch_adapter'),
285 'nostr': ('.extensions.nostr_adapter', 'create_nostr_adapter'),
286 'tlon': ('.extensions.tlon_adapter', 'create_tlon_adapter'),
287 'openprose': ('.extensions.openprose_adapter', 'create_openprose_adapter'),
288 'telegram_user': ('.extensions.telegram_user_adapter', 'create_telegram_user_adapter'),
289 'discord_user': ('.extensions.discord_user_adapter', 'create_discord_user_adapter'),
290 'zalo_user': ('.extensions.zalo_user_adapter', 'create_zalo_user_adapter'),
291 'bluebubbles': ('.extensions.bluebubbles_adapter', 'create_bluebubbles_adapter'),
292 'email': ('.extensions.email_adapter', 'create_email_adapter'),
293 'voice': ('.extensions.voice_adapter', 'create_voice_adapter'),
294 }
296 # Env var fallbacks for token/credential per channel type
297 _ENV_FALLBACKS: Dict[str, str] = {
298 'telegram': 'TELEGRAM_BOT_TOKEN',
299 'discord': 'DISCORD_BOT_TOKEN',
300 'whatsapp': 'WHATSAPP_API_URL',
301 'slack': 'SLACK_BOT_TOKEN',
302 'signal': 'SIGNAL_CLI_URL',
303 'teams': 'TEAMS_BOT_TOKEN',
304 }
306 def register_channel(self, channel_type: str, token: str = None, **kwargs) -> bool:
307 """Register any channel adapter by type.
309 Generic factory — replaces per-channel register_* methods.
310 Falls back to env var if no token provided. Returns True on success.
311 """
312 factory_info = self._ADAPTER_FACTORIES.get(channel_type)
313 if not factory_info:
314 logger.warning(f"Unknown channel type: {channel_type}")
315 return False
317 module_path, factory_name = factory_info
318 token = token or os.getenv(self._ENV_FALLBACKS.get(channel_type, ''))
319 if not token and channel_type not in ('web', 'imessage', 'openprose'):
320 # web/imessage/openprose don't need external tokens
321 logger.warning(f"{channel_type} token not provided, skipping")
322 return False
324 try:
325 import importlib
326 mod = importlib.import_module(module_path, package='integrations.channels')
327 factory_fn = getattr(mod, factory_name)
328 if token:
329 adapter = factory_fn(token=token, **kwargs)
330 else:
331 adapter = factory_fn(**kwargs)
332 self.registry.register(adapter)
333 logger.info(f"{channel_type} adapter registered")
334 return True
335 except Exception as e:
336 logger.warning(f"{channel_type} adapter registration failed: {e}")
337 return False
339 # Keep legacy methods as thin delegates for backward compat
340 def register_telegram(self, token: str = None, **kwargs) -> None:
341 self.register_channel('telegram', token=token, **kwargs)
343 def register_discord(self, token: str = None, **kwargs) -> None:
344 self.register_channel('discord', token=token, **kwargs)
346 def register_whatsapp(self, api_url: str = None, **kwargs) -> None:
347 self.register_channel('whatsapp', token=api_url, **kwargs)
349 def set_user_session(
350 self,
351 channel: str,
352 sender_id: str,
353 user_id: int,
354 prompt_id: int,
355 ) -> None:
356 """Set user session mapping for a channel sender."""
357 session = self._session_manager.get_session(channel, sender_id, user_id=user_id, prompt_id=prompt_id)
359 def _run_async_loop(self) -> None:
360 """Run asyncio event loop in background thread."""
361 self._loop = asyncio.new_event_loop()
362 asyncio.set_event_loop(self._loop)
364 try:
365 self._loop.run_until_complete(self.registry.start_all())
366 self._loop.run_forever()
367 finally:
368 self._loop.run_until_complete(self.registry.stop_all())
369 self._loop.close()
371 def start(self) -> None:
372 """Start all channel adapters in background thread."""
373 if self._thread and self._thread.is_alive():
374 logger.warning("Channels already running")
375 return
377 self._thread = threading.Thread(target=self._run_async_loop, daemon=True)
378 self._thread.start()
379 logger.info("Channel adapters started in background")
381 def stop(self) -> None:
382 """Stop all channel adapters."""
383 if self._loop:
384 self._loop.call_soon_threadsafe(self._loop.stop)
386 if self._thread:
387 self._thread.join(timeout=5)
389 logger.info("Channel adapters stopped")
391 def get_status(self) -> Dict[str, str]:
392 """Get status of all channels."""
393 return {
394 name: status.value
395 for name, status in self.registry.get_status().items()
396 }
399# Global integration instance
400_integration: Optional[FlaskChannelIntegration] = None
403def get_channel_integration() -> FlaskChannelIntegration:
404 """Get or create the global channel integration."""
405 global _integration
406 if _integration is None:
407 _integration = FlaskChannelIntegration()
408 return _integration
411def init_channels(app=None, config: Dict[str, Any] = None) -> FlaskChannelIntegration:
412 """
413 Initialize channel integrations.
415 Call this from your Flask app startup:
417 from integrations.channels.flask_integration import init_channels
419 app = Flask(__name__)
420 channels = init_channels(app)
421 channels.register_telegram()
422 channels.start()
424 Args:
425 app: Flask app instance (optional)
426 config: Configuration dict (optional)
428 Returns:
429 FlaskChannelIntegration instance
430 """
431 config = config or {}
432 from core.constants import DEFAULT_USER_ID, DEFAULT_PROMPT_ID
434 integration = FlaskChannelIntegration(
435 agent_api_url=config.get("agent_api_url", "http://localhost:6777/chat"),
436 default_user_id=config.get("default_user_id", DEFAULT_USER_ID),
437 default_prompt_id=config.get("default_prompt_id", DEFAULT_PROMPT_ID),
438 create_mode=config.get("create_mode", False),
439 device_id=config.get("device_id"),
440 )
442 global _integration
443 _integration = integration
445 # Add Flask routes if app provided
446 if app:
447 @app.route("/channels/status", methods=["GET"])
448 def channel_status():
449 return integration.get_status()
451 @app.route("/channels/send", methods=["POST"])
452 def channel_send():
453 from flask import request, jsonify
455 data = request.json
456 channel = data.get("channel")
457 chat_id = data.get("chat_id")
458 text = data.get("text")
460 if not all([channel, chat_id, text]):
461 return jsonify({"error": "Missing required fields"}), 400
463 # Run async send in the event loop
464 if integration._loop:
465 future = asyncio.run_coroutine_threadsafe(
466 integration.registry.send_to_channel(channel, chat_id, text),
467 integration._loop,
468 )
469 result = future.result(timeout=30)
470 return jsonify({
471 "success": result.success,
472 "message_id": result.message_id,
473 "error": result.error,
474 })
475 else:
476 return jsonify({"error": "Channels not running"}), 503
478 return integration