Coverage for integrations / channels / self_chat.py: 79.8%

89 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""Self-chat handler — treat an owner's "Message Yourself" thread 

2as a private notebook-to-agent channel. 

3 

4When the Nunba owner messages their own WhatsApp number (sender_id == 

5owner_phone), we skip UserChannelBinding lookup and pairing flow 

6entirely, persist the message to the MemoryGraph, dispatch to the 

7owner's default agent, and reply in the same thread. Replies are NOT 

8fanned out to other bound channels — the self-chat is private by 

9design. 

10 

11CLAUDE.md gates: 

12 * Gate 2 (DRY): uses existing MemoryGraph + response router; no 

13 parallel memory or send path is introduced. 

14 * Gate 3 (SRP): detection + routing only. Does NOT own the chat 

15 pipeline, the agent API, or WAMP push — it calls into them. 

16 * Gate 4 (no parallel paths): one caller (FlaskChannelIntegration). 

17""" 

18from __future__ import annotations 

19 

20import asyncio 

21import logging 

22import re 

23from typing import Optional, TYPE_CHECKING 

24 

25import requests 

26from core.http_pool import pooled_post 

27 

28if TYPE_CHECKING: # avoid runtime import cycle 

29 from .base import Message 

30 from .registry import ChannelRegistry 

31 from .session_manager import SessionManager 

32 from .response.router import ChannelResponseRouter 

33 

34logger = logging.getLogger(__name__) 

35 

36 

37# E.164 digits only. Strips "+", "@c.us", "@s.whatsapp.net", spaces, dashes. 

38_PHONE_STRIP = re.compile(r"[+\-\s]|@[a-z.]+$", re.IGNORECASE) 

39 

40 

41def normalize_phone(s: Optional[str]) -> str: 

42 """Reduce a phone / JID to its pure digit form for equality checks. 

43 

44 Examples: 

45 "+1 555 123 4567" -> "15551234567" 

46 "15551234567@c.us" -> "15551234567" 

47 "15551234567@s.whatsapp.net" -> "15551234567" 

48 """ 

49 if not s: 

50 return "" 

51 return _PHONE_STRIP.sub("", str(s)).strip() 

52 

53 

54class SelfChatHandler: 

55 """Routes self-chat messages to a private notebook-to-agent flow. 

56 

57 Usage from FlaskChannelIntegration._handle_message:: 

58 

59 if self._self_chat.is_self_message(message): 

60 return self._self_chat.handle(message, session) 

61 """ 

62 

63 def __init__( 

64 self, 

65 *, 

66 agent_api_url: str, 

67 owner_user_id: int, 

68 owner_prompt_id: int, 

69 device_id: Optional[str], 

70 session_manager: "SessionManager", 

71 response_router: "ChannelResponseRouter", 

72 registry: "ChannelRegistry", 

73 memory_user_id: Optional[str] = None, 

74 get_loop=None, 

75 ) -> None: 

76 self.agent_api_url = agent_api_url 

77 self.owner_user_id = owner_user_id 

78 self.owner_prompt_id = owner_prompt_id 

79 self.device_id = device_id 

80 self.session_manager = session_manager 

81 self.response_router = response_router 

82 self.registry = registry 

83 # MemoryGraph user key (defaults to owner user_id stringified) 

84 self.memory_user_id = memory_user_id or str(owner_user_id) 

85 # Callable returning the async loop that owns the adapters 

86 # (FlaskChannelIntegration._loop). Deferred because the loop 

87 # is spawned in a daemon thread after __init__. 

88 self._get_loop = get_loop or (lambda: None) 

89 

90 # ── Detection ───────────────────────────────────────────────── 

91 def is_self_message(self, message: "Message") -> bool: 

92 """True iff this message's sender matches the owner's phone as 

93 configured on the adapter *and* the feature is enabled on that 

94 adapter (``config.extra['enable_self_chat_agent']`` default True). 

95 """ 

96 adapter = self.registry.get(message.channel) 

97 if adapter is None: 

98 return False 

99 extra = getattr(adapter.config, "extra", None) or {} 

100 if extra.get("enable_self_chat_agent", True) is False: 

101 return False 

102 owner = extra.get("owner_phone") or extra.get("phone_number") 

103 if not owner: 

104 return False 

105 return ( 

106 bool(message.sender_id) 

107 and normalize_phone(message.sender_id) == normalize_phone(owner) 

108 ) 

109 

110 # ── Handling ────────────────────────────────────────────────── 

111 def handle(self, message: "Message", session) -> Optional[str]: 

112 """Persist → dispatch → reply in-thread. Returns the reply text 

113 (also already sent via adapter.send_message). 

114 

115 Returns None on a fatal routing failure (so the outer adapter 

116 can log + surface a generic error to the user).""" 

117 # 1. Persist to MemoryGraph (best-effort; never block routing) 

118 self._persist_note(message) 

119 

120 # 2. Track in session history (same as normal path) 

121 if session is not None: 

122 try: 

123 session.add_message("user", message.content) 

124 except Exception: # noqa: BLE001 

125 logger.debug("self-chat session.add_message failed", exc_info=True) 

126 

127 # 3. Dispatch to agent API with is_self_chat marker 

128 payload = { 

129 "user_id": self.owner_user_id, 

130 "prompt_id": self.owner_prompt_id, 

131 "prompt": message.content, 

132 "create_agent": False, # self-chat never needs agent creation 

133 "device_id": self.device_id, 

134 "channel_context": { 

135 "channel": message.channel, 

136 "sender_id": message.sender_id, 

137 "sender_name": message.sender_name, 

138 "chat_id": message.chat_id, 

139 "is_group": message.is_group, 

140 "is_self_chat": True, # lets chat hot path apply notebook heuristics 

141 "message_id": message.id, 

142 }, 

143 } 

144 try: 

145 resp = pooled_post(self.agent_api_url, json=payload, timeout=120) 

146 except requests.Timeout: 

147 logger.error("self-chat agent timeout") 

148 return "(timed out — try again)" 

149 except Exception as e: # noqa: BLE001 

150 logger.error("self-chat agent call failed: %s", e) 

151 return None 

152 

153 if resp.status_code != 200: 

154 logger.error("self-chat agent %s: %s", resp.status_code, resp.text[:200]) 

155 return None 

156 

157 try: 

158 reply = (resp.json() or {}).get("response") or "" 

159 except Exception: # noqa: BLE001 

160 reply = "" 

161 reply = reply or "✓ noted" 

162 

163 # 4. Track assistant turn + log user message (no fan-out: private) 

164 if session is not None: 

165 try: 

166 session.add_message("assistant", reply) 

167 except Exception: # noqa: BLE001 

168 logger.debug("self-chat session.add_message assistant failed", 

169 exc_info=True) 

170 try: 

171 self.response_router.log_user_message( 

172 self.owner_user_id, message.channel, message.content, 

173 ) 

174 except Exception: # noqa: BLE001 

175 logger.debug("self-chat log_user_message failed", exc_info=True) 

176 

177 # 5. Reply in the same WhatsApp thread (chat_id = owner @c.us). 

178 # We reach into the adapter directly — fan_out=False is 

179 # explicit: the self-chat is private and must not leak to 

180 # other channels the owner has bound (Telegram, Discord…). 

181 self._send_reply_in_thread(message, reply) 

182 return reply 

183 

184 # ── Internals ───────────────────────────────────────────────── 

185 def _persist_note(self, message: "Message") -> None: 

186 """Write the note to MemoryGraph with memory_type='self_note'.""" 

187 try: 

188 from core.platform_paths import get_memory_graph_dir 

189 from integrations.memory.memory_graph import MemoryGraph # type: ignore 

190 except Exception: # noqa: BLE001 

191 logger.debug("self-chat persist: MemoryGraph unavailable", exc_info=True) 

192 return 

193 try: 

194 session_key = f"self_chat:{self.memory_user_id}" 

195 db_path = get_memory_graph_dir(session_key) 

196 mg = MemoryGraph(db_path=db_path, user_id=self.memory_user_id) 

197 mg.register( 

198 content=message.content, 

199 metadata={ 

200 "memory_type": "self_note", 

201 "source_agent": "self_chat", 

202 "session_id": session_key, 

203 "channel": message.channel, 

204 "message_id": message.id, 

205 }, 

206 context_snapshot=f"Self-chat note via {message.channel}", 

207 ) 

208 except Exception as e: # noqa: BLE001 

209 logger.debug("self-chat persist failed: %s", e) 

210 

211 def _send_reply_in_thread(self, message: "Message", reply: str) -> None: 

212 """Send ``reply`` via ``registry.send_to_channel`` onto the 

213 integration's asyncio loop. No fan-out — this is a private thread. 

214 """ 

215 adapter = self.registry.get(message.channel) 

216 if adapter is None: 

217 logger.warning("self-chat: adapter '%s' gone", message.channel) 

218 return 

219 chat_id = message.chat_id or message.sender_id 

220 coro = self.registry.send_to_channel(message.channel, chat_id, reply) 

221 

222 loop = self._get_loop() 

223 if loop is not None and loop.is_running(): 

224 try: 

225 asyncio.run_coroutine_threadsafe(coro, loop) 

226 return 

227 except Exception as e: # noqa: BLE001 

228 logger.error("self-chat schedule on loop failed: %s", e) 

229 

230 # Fallback: no running adapter loop — create a short-lived one. 

231 # Rare path; normal Nunba boot attaches a loop via 

232 # FlaskChannelIntegration.start(). 

233 try: 

234 asyncio.run(coro) 

235 except RuntimeError: 

236 logger.error("self-chat send dropped — no async loop available") 

237 except Exception as e: # noqa: BLE001 

238 logger.error("self-chat direct-send failed: %s", e)