Coverage for integrations / channels / self_chat.py: 79.8%
89 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""Self-chat handler — treat an owner's "Message Yourself" thread
2as a private notebook-to-agent channel.
4When the Nunba owner messages their own WhatsApp number (sender_id ==
5owner_phone), we skip UserChannelBinding lookup and pairing flow
6entirely, persist the message to the MemoryGraph, dispatch to the
7owner's default agent, and reply in the same thread. Replies are NOT
8fanned out to other bound channels — the self-chat is private by
9design.
11CLAUDE.md gates:
12 * Gate 2 (DRY): uses existing MemoryGraph + response router; no
13 parallel memory or send path is introduced.
14 * Gate 3 (SRP): detection + routing only. Does NOT own the chat
15 pipeline, the agent API, or WAMP push — it calls into them.
16 * Gate 4 (no parallel paths): one caller (FlaskChannelIntegration).
17"""
18from __future__ import annotations
20import asyncio
21import logging
22import re
23from typing import Optional, TYPE_CHECKING
25import requests
26from core.http_pool import pooled_post
28if TYPE_CHECKING: # avoid runtime import cycle
29 from .base import Message
30 from .registry import ChannelRegistry
31 from .session_manager import SessionManager
32 from .response.router import ChannelResponseRouter
34logger = logging.getLogger(__name__)
37# E.164 digits only. Strips "+", "@c.us", "@s.whatsapp.net", spaces, dashes.
38_PHONE_STRIP = re.compile(r"[+\-\s]|@[a-z.]+$", re.IGNORECASE)
41def normalize_phone(s: Optional[str]) -> str:
42 """Reduce a phone / JID to its pure digit form for equality checks.
44 Examples:
45 "+1 555 123 4567" -> "15551234567"
46 "15551234567@c.us" -> "15551234567"
47 "15551234567@s.whatsapp.net" -> "15551234567"
48 """
49 if not s:
50 return ""
51 return _PHONE_STRIP.sub("", str(s)).strip()
54class SelfChatHandler:
55 """Routes self-chat messages to a private notebook-to-agent flow.
57 Usage from FlaskChannelIntegration._handle_message::
59 if self._self_chat.is_self_message(message):
60 return self._self_chat.handle(message, session)
61 """
63 def __init__(
64 self,
65 *,
66 agent_api_url: str,
67 owner_user_id: int,
68 owner_prompt_id: int,
69 device_id: Optional[str],
70 session_manager: "SessionManager",
71 response_router: "ChannelResponseRouter",
72 registry: "ChannelRegistry",
73 memory_user_id: Optional[str] = None,
74 get_loop=None,
75 ) -> None:
76 self.agent_api_url = agent_api_url
77 self.owner_user_id = owner_user_id
78 self.owner_prompt_id = owner_prompt_id
79 self.device_id = device_id
80 self.session_manager = session_manager
81 self.response_router = response_router
82 self.registry = registry
83 # MemoryGraph user key (defaults to owner user_id stringified)
84 self.memory_user_id = memory_user_id or str(owner_user_id)
85 # Callable returning the async loop that owns the adapters
86 # (FlaskChannelIntegration._loop). Deferred because the loop
87 # is spawned in a daemon thread after __init__.
88 self._get_loop = get_loop or (lambda: None)
90 # ── Detection ─────────────────────────────────────────────────
91 def is_self_message(self, message: "Message") -> bool:
92 """True iff this message's sender matches the owner's phone as
93 configured on the adapter *and* the feature is enabled on that
94 adapter (``config.extra['enable_self_chat_agent']`` default True).
95 """
96 adapter = self.registry.get(message.channel)
97 if adapter is None:
98 return False
99 extra = getattr(adapter.config, "extra", None) or {}
100 if extra.get("enable_self_chat_agent", True) is False:
101 return False
102 owner = extra.get("owner_phone") or extra.get("phone_number")
103 if not owner:
104 return False
105 return (
106 bool(message.sender_id)
107 and normalize_phone(message.sender_id) == normalize_phone(owner)
108 )
110 # ── Handling ──────────────────────────────────────────────────
111 def handle(self, message: "Message", session) -> Optional[str]:
112 """Persist → dispatch → reply in-thread. Returns the reply text
113 (also already sent via adapter.send_message).
115 Returns None on a fatal routing failure (so the outer adapter
116 can log + surface a generic error to the user)."""
117 # 1. Persist to MemoryGraph (best-effort; never block routing)
118 self._persist_note(message)
120 # 2. Track in session history (same as normal path)
121 if session is not None:
122 try:
123 session.add_message("user", message.content)
124 except Exception: # noqa: BLE001
125 logger.debug("self-chat session.add_message failed", exc_info=True)
127 # 3. Dispatch to agent API with is_self_chat marker
128 payload = {
129 "user_id": self.owner_user_id,
130 "prompt_id": self.owner_prompt_id,
131 "prompt": message.content,
132 "create_agent": False, # self-chat never needs agent creation
133 "device_id": self.device_id,
134 "channel_context": {
135 "channel": message.channel,
136 "sender_id": message.sender_id,
137 "sender_name": message.sender_name,
138 "chat_id": message.chat_id,
139 "is_group": message.is_group,
140 "is_self_chat": True, # lets chat hot path apply notebook heuristics
141 "message_id": message.id,
142 },
143 }
144 try:
145 resp = pooled_post(self.agent_api_url, json=payload, timeout=120)
146 except requests.Timeout:
147 logger.error("self-chat agent timeout")
148 return "(timed out — try again)"
149 except Exception as e: # noqa: BLE001
150 logger.error("self-chat agent call failed: %s", e)
151 return None
153 if resp.status_code != 200:
154 logger.error("self-chat agent %s: %s", resp.status_code, resp.text[:200])
155 return None
157 try:
158 reply = (resp.json() or {}).get("response") or ""
159 except Exception: # noqa: BLE001
160 reply = ""
161 reply = reply or "✓ noted"
163 # 4. Track assistant turn + log user message (no fan-out: private)
164 if session is not None:
165 try:
166 session.add_message("assistant", reply)
167 except Exception: # noqa: BLE001
168 logger.debug("self-chat session.add_message assistant failed",
169 exc_info=True)
170 try:
171 self.response_router.log_user_message(
172 self.owner_user_id, message.channel, message.content,
173 )
174 except Exception: # noqa: BLE001
175 logger.debug("self-chat log_user_message failed", exc_info=True)
177 # 5. Reply in the same WhatsApp thread (chat_id = owner @c.us).
178 # We reach into the adapter directly — fan_out=False is
179 # explicit: the self-chat is private and must not leak to
180 # other channels the owner has bound (Telegram, Discord…).
181 self._send_reply_in_thread(message, reply)
182 return reply
184 # ── Internals ─────────────────────────────────────────────────
185 def _persist_note(self, message: "Message") -> None:
186 """Write the note to MemoryGraph with memory_type='self_note'."""
187 try:
188 from core.platform_paths import get_memory_graph_dir
189 from integrations.memory.memory_graph import MemoryGraph # type: ignore
190 except Exception: # noqa: BLE001
191 logger.debug("self-chat persist: MemoryGraph unavailable", exc_info=True)
192 return
193 try:
194 session_key = f"self_chat:{self.memory_user_id}"
195 db_path = get_memory_graph_dir(session_key)
196 mg = MemoryGraph(db_path=db_path, user_id=self.memory_user_id)
197 mg.register(
198 content=message.content,
199 metadata={
200 "memory_type": "self_note",
201 "source_agent": "self_chat",
202 "session_id": session_key,
203 "channel": message.channel,
204 "message_id": message.id,
205 },
206 context_snapshot=f"Self-chat note via {message.channel}",
207 )
208 except Exception as e: # noqa: BLE001
209 logger.debug("self-chat persist failed: %s", e)
211 def _send_reply_in_thread(self, message: "Message", reply: str) -> None:
212 """Send ``reply`` via ``registry.send_to_channel`` onto the
213 integration's asyncio loop. No fan-out — this is a private thread.
214 """
215 adapter = self.registry.get(message.channel)
216 if adapter is None:
217 logger.warning("self-chat: adapter '%s' gone", message.channel)
218 return
219 chat_id = message.chat_id or message.sender_id
220 coro = self.registry.send_to_channel(message.channel, chat_id, reply)
222 loop = self._get_loop()
223 if loop is not None and loop.is_running():
224 try:
225 asyncio.run_coroutine_threadsafe(coro, loop)
226 return
227 except Exception as e: # noqa: BLE001
228 logger.error("self-chat schedule on loop failed: %s", e)
230 # Fallback: no running adapter loop — create a short-lived one.
231 # Rare path; normal Nunba boot attaches a loop via
232 # FlaskChannelIntegration.start().
233 try:
234 asyncio.run(coro)
235 except RuntimeError:
236 logger.error("self-chat send dropped — no async loop available")
237 except Exception as e: # noqa: BLE001
238 logger.error("self-chat direct-send failed: %s", e)