Coverage for integrations / social / whatsapp_supervisor.py: 41.5%

171 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""WhatsApp gateway supervisor — embedded Baileys via Node subprocess. 

2 

3Sister of ``livekit_supervisor.py``: same filesystem layout under 

4``~/.hevolve/<service>/``, same daemon-thread restart pattern, same 

5``core.subprocess_safe.hidden_popen_kwargs`` for silent Windows spawn, 

6same per-deploy-mode gating. 

7 

8Why this exists: 

9 The existing WhatsApp adapter at ``integrations/channels/whatsapp_adapter.py`` 

10 talks WAHA's HTTP+WS API at ``http://localhost:3000`` (or whatever 

11 ``WHATSAPP_API_URL`` resolves to). Asking every Nunba install to 

12 install Docker + run WAHA's container is a heavy ask — the user 

13 explicitly told us "Docker is a new system on the user machine and 

14 we cannot rely on that". 

15 

16 This supervisor replaces the WAHA-Docker path with an embedded 

17 Baileys gateway (``integrations/channels/whatsapp/gateway.js``) 

18 that exposes the same WAHA API subset on the same port, so the 

19 adapter stays identical. The only "new system" is Node.js — but 

20 Nunba's web build already requires Node, so most user machines 

21 already have it. When Node is missing we surface a clear single- 

22 line error and the adapter falls back to the existing 

23 ``WHATSAPP_API_URL`` override path (operator-managed remote WAHA). 

24 

25Singleton: one instance per HARTOS process. Lifecycle owned by 

26``start_supervisor()``, called from ``hartos_bootstrap.py``. 

27""" 

28 

29from __future__ import annotations 

30 

31import json 

32import logging 

33import os 

34import shutil 

35import socket 

36import subprocess 

37import threading 

38import time 

39from pathlib import Path 

40from typing import Any, Dict, Optional 

41 

42logger = logging.getLogger(__name__) 

43 

44 

45# ── Filesystem layout (mirrors livekit_supervisor) ─────────────────── 

46def _hevolve_home() -> Path: 

47 base = os.environ.get('HEVOLVE_HOME') 

48 if base: 

49 return Path(base).expanduser() 

50 return Path.home() / '.hevolve' 

51 

52 

53def _whatsapp_home() -> Path: 

54 return _hevolve_home() / 'whatsapp' 

55 

56 

57def _gateway_dir() -> Path: 

58 """Repo path that holds gateway.js + package.json — installed-once 

59 during ensure_baileys_deps().""" 

60 return Path(__file__).resolve().parent.parent / 'channels' / 'whatsapp' 

61 

62 

63# ── Deploy-mode gating (mirrors livekit_supervisor pattern) ────────── 

64def _deploy_mode() -> str: 

65 return os.environ.get('HEVOLVE_DEPLOY_MODE', 'flat').lower().strip() 

66 

67 

68def supervisor_should_run() -> bool: 

69 """True iff this process should host the Baileys gateway itself. 

70 

71 Override: 

72 WHATSAPP_AUTOSTART=0 → force-disable (operator runs WAHA elsewhere) 

73 WHATSAPP_AUTOSTART=1 → force-enable (regardless of deploy mode) 

74 WHATSAPP_API_URL set to a non-localhost address → operator 

75 explicitly wants remote WAHA; we skip spawning the local 

76 gateway so the adapter's existing override path takes effect. 

77 """ 

78 forced = os.environ.get('WHATSAPP_AUTOSTART') 

79 if forced == '0': 

80 return False 

81 if forced == '1': 

82 return True 

83 

84 # Operator points at a remote WAHA — let them. Single source of 

85 # truth for "use external gateway" is the adapter's WHATSAPP_API_URL 

86 # env (already honoured at adapter init). 

87 api_url = os.environ.get('WHATSAPP_API_URL', '').strip().lower() 

88 if api_url and not ( 

89 'localhost' in api_url or '127.0.0.1' in api_url 

90 ): 

91 return False 

92 

93 return _deploy_mode() in ('flat', 'regional') 

94 

95 

96# ── Port helpers (lifted from livekit_supervisor.py — same idiom) ──── 

97def _port_in_use(port: int) -> bool: 

98 """True iff *something* is already listening on 127.0.0.1:port. 

99 

100 When True, we skip the Baileys spawn and assume an operator has 

101 WAHA running there manually — same short-circuit livekit_supervisor 

102 uses. Single check, single rule. 

103 """ 

104 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

105 sock.settimeout(0.5) 

106 try: 

107 return sock.connect_ex(('127.0.0.1', port)) == 0 

108 finally: 

109 try: 

110 sock.close() 

111 except OSError: 

112 pass 

113 

114 

115def _gateway_port() -> int: 

116 return int(os.environ.get('WHATSAPP_GATEWAY_PORT', '3000')) 

117 

118 

119# ── Node + dep ensures (one-time on first start) ───────────────────── 

120def ensure_node() -> Optional[str]: 

121 """Return path to ``node`` executable, or None if Node isn't on 

122 PATH. Caller surfaces a "please install Node" message to the user 

123 when this returns None.""" 

124 return shutil.which('node') 

125 

126 

127def ensure_npm() -> Optional[str]: 

128 """Return path to ``npm``, or None. Same docs as ensure_node.""" 

129 return shutil.which('npm') 

130 

131 

132def ensure_baileys_deps() -> bool: 

133 """Idempotent: install gateway.js's npm deps under the gateway dir 

134 if its ``node_modules/`` doesn't yet exist. Returns True iff the 

135 deps are now ready (cached or freshly installed). False on 

136 install failure — caller logs and falls back.""" 

137 gateway_dir = _gateway_dir() 

138 if not gateway_dir.exists(): 

139 logger.error( 

140 "whatsapp_supervisor: gateway dir missing at %s", gateway_dir) 

141 return False 

142 

143 node_modules = gateway_dir / 'node_modules' 

144 if node_modules.exists() and node_modules.is_dir(): 

145 return True 

146 

147 npm = ensure_npm() 

148 if npm is None: 

149 logger.warning( 

150 "whatsapp_supervisor: npm not on PATH; cannot install Baileys " 

151 "deps. Set WHATSAPP_API_URL to a managed WAHA endpoint, or " 

152 "install Node.js (which provides npm).") 

153 return False 

154 

155 # First-run install — pulls @whiskeysockets/baileys + express. 

156 # Hidden cmd window on Windows. 

157 from core.subprocess_safe import hidden_popen_kwargs 

158 logger.info( 

159 "whatsapp_supervisor: installing Baileys deps in %s " 

160 "(one-time, ~80 MB)…", gateway_dir) 

161 try: 

162 result = subprocess.run( 

163 [npm, 'install', '--no-audit', '--no-fund', 

164 '--loglevel=error', '--prefer-offline'], 

165 cwd=str(gateway_dir), 

166 stdout=subprocess.PIPE, 

167 stderr=subprocess.STDOUT, 

168 text=True, 

169 timeout=180, 

170 **hidden_popen_kwargs(), 

171 ) 

172 except (subprocess.TimeoutExpired, OSError) as e: 

173 logger.warning("whatsapp_supervisor: npm install failed (%s)", e) 

174 return False 

175 if result.returncode != 0: 

176 logger.warning( 

177 "whatsapp_supervisor: npm install returned %d:\n%s", 

178 result.returncode, (result.stdout or '')[:500]) 

179 return False 

180 logger.info("whatsapp_supervisor: Baileys deps installed") 

181 return True 

182 

183 

184# ── Supervisor (mirrors livekit_supervisor._Supervisor) ────────────── 

185class _Supervisor: 

186 """One per process. Daemon thread → exits cleanly with parent.""" 

187 

188 def __init__(self) -> None: 

189 self.proc: Optional[subprocess.Popen] = None 

190 self.thread: Optional[threading.Thread] = None 

191 self.stop_event = threading.Event() 

192 self.last_error: Optional[str] = None 

193 self.last_started: Optional[float] = None 

194 self.restart_count = 0 

195 self.lock = threading.Lock() 

196 

197 def info(self) -> Dict[str, Any]: 

198 running = bool(self.proc and self.proc.poll() is None) 

199 return { 

200 'running': running, 

201 'port': _gateway_port(), 

202 'pid': self.proc.pid if running and self.proc else None, 

203 'last_error': self.last_error, 

204 'last_started': self.last_started, 

205 'restart_count': self.restart_count, 

206 } 

207 

208 def start(self) -> Dict[str, Any]: 

209 if self.thread is not None and self.thread.is_alive(): 

210 return self.info() 

211 

212 node = ensure_node() 

213 if node is None: 

214 self.last_error = ( 

215 'Node.js not on PATH; install Node ≥18 or set ' 

216 'WHATSAPP_API_URL to a remote WAHA endpoint') 

217 logger.warning("whatsapp_supervisor: %s", self.last_error) 

218 return self.info() 

219 

220 if not ensure_baileys_deps(): 

221 self.last_error = ( 

222 'Baileys npm install failed — see prior log lines') 

223 return self.info() 

224 

225 port = _gateway_port() 

226 if _port_in_use(port): 

227 self.last_error = ( 

228 f'port {port} already in use; assuming operator-managed ' 

229 f'WhatsApp gateway is running and skipping spawn') 

230 logger.info("whatsapp_supervisor: %s", self.last_error) 

231 return self.info() 

232 

233 self.thread = threading.Thread( 

234 target=self._run, daemon=True, name='whatsapp-supervisor') 

235 self.thread.start() 

236 return self.info() 

237 

238 def stop(self) -> None: 

239 self.stop_event.set() 

240 with self.lock: 

241 if self.proc and self.proc.poll() is None: 

242 try: 

243 self.proc.terminate() 

244 try: 

245 self.proc.wait(timeout=5) 

246 except subprocess.TimeoutExpired: 

247 self.proc.kill() 

248 except OSError: 

249 pass 

250 

251 def _run(self) -> None: 

252 from core.subprocess_safe import hidden_popen_kwargs 

253 node = ensure_node() or 'node' 

254 gateway_js = _gateway_dir() / 'gateway.js' 

255 env = os.environ.copy() 

256 env['WHATSAPP_GATEWAY_PORT'] = str(_gateway_port()) 

257 env['HEVOLVE_HOME'] = str(_hevolve_home()) 

258 

259 backoff = 1.0 

260 while not self.stop_event.is_set(): 

261 try: 

262 cmd = [node, str(gateway_js)] 

263 logger.info( 

264 "whatsapp_supervisor: spawning %s on port %d", 

265 ' '.join(cmd), _gateway_port()) 

266 with self.lock: 

267 self.proc = subprocess.Popen( 

268 cmd, 

269 stdout=subprocess.PIPE, 

270 stderr=subprocess.STDOUT, 

271 cwd=str(_gateway_dir()), 

272 env=env, 

273 text=True, 

274 bufsize=1, # line-buffered for line-by-line drain 

275 **hidden_popen_kwargs(), 

276 ) 

277 self.last_started = time.time() 

278 self.restart_count += 1 

279 

280 # Drain stdout into logger so operators see what 

281 # Baileys is doing without a separate log file. Lines 

282 # are JSON when emitted by gateway.js (matches 

283 # livekit_supervisor's stdout-streaming idiom). 

284 if self.proc.stdout is not None: 

285 for raw in self.proc.stdout: 

286 if self.stop_event.is_set(): 

287 break 

288 line = raw.rstrip() 

289 if not line: 

290 continue 

291 # Try JSON event; fall back to plain text. 

292 try: 

293 payload = json.loads(line) 

294 event = payload.get('event') or 'log' 

295 logger.info( 

296 "whatsapp_supervisor: %s %s", 

297 event, 

298 {k: v for k, v in payload.items() 

299 if k != 'event'}, 

300 ) 

301 except (ValueError, AttributeError): 

302 logger.info("whatsapp_supervisor: %s", line) 

303 

304 rc = self.proc.wait() if self.proc else None 

305 self.last_error = f'gateway exited rc={rc}' 

306 if self.stop_event.is_set(): 

307 break 

308 logger.warning( 

309 "whatsapp_supervisor: gateway exited rc=%s; " 

310 "respawning in %.1fs", rc, backoff) 

311 time.sleep(backoff) 

312 backoff = min(backoff * 2, 60.0) 

313 except FileNotFoundError as e: 

314 self.last_error = f'node not found: {e}' 

315 logger.error("whatsapp_supervisor: %s", self.last_error) 

316 break 

317 except Exception as e: # noqa: BLE001 — supervisor catches all 

318 self.last_error = f'spawn error: {e}' 

319 logger.exception("whatsapp_supervisor: spawn error") 

320 if self.stop_event.is_set(): 

321 break 

322 time.sleep(backoff) 

323 backoff = min(backoff * 2, 60.0) 

324 

325 

326_supervisor: Optional[_Supervisor] = None 

327_supervisor_lock = threading.Lock() 

328 

329 

330def start_supervisor() -> Dict[str, Any]: 

331 """Idempotent entrypoint called from hartos_bootstrap.py. No-op 

332 when supervisor_should_run() is False (central deploy, operator- 

333 managed remote WAHA, etc.).""" 

334 if not supervisor_should_run(): 

335 logger.info( 

336 "whatsapp_supervisor: skipped (deploy_mode=%s, autostart=%s, " 

337 "WHATSAPP_API_URL=%s)", 

338 _deploy_mode(), 

339 os.environ.get('WHATSAPP_AUTOSTART'), 

340 os.environ.get('WHATSAPP_API_URL'), 

341 ) 

342 return {'running': False, 'reason': 'gated_off'} 

343 

344 global _supervisor 

345 with _supervisor_lock: 

346 if _supervisor is None: 

347 _supervisor = _Supervisor() 

348 return _supervisor.start() 

349 

350 

351def stop_supervisor() -> None: 

352 """Best-effort shutdown — used by tests + bootstrap teardown.""" 

353 global _supervisor 

354 with _supervisor_lock: 

355 if _supervisor is not None: 

356 _supervisor.stop() 

357 

358 

359def info() -> Dict[str, Any]: 

360 """Reflection for /health endpoints + agent diagnostics.""" 

361 global _supervisor 

362 if _supervisor is None: 

363 return {'running': False, 'reason': 'not_started'} 

364 return _supervisor.info()