Coverage for integrations / social / whatsapp_supervisor.py: 41.5%
171 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""WhatsApp gateway supervisor — embedded Baileys via Node subprocess.
3Sister of ``livekit_supervisor.py``: same filesystem layout under
4``~/.hevolve/<service>/``, same daemon-thread restart pattern, same
5``core.subprocess_safe.hidden_popen_kwargs`` for silent Windows spawn,
6same per-deploy-mode gating.
8Why this exists:
9 The existing WhatsApp adapter at ``integrations/channels/whatsapp_adapter.py``
10 talks WAHA's HTTP+WS API at ``http://localhost:3000`` (or whatever
11 ``WHATSAPP_API_URL`` resolves to). Asking every Nunba install to
12 install Docker + run WAHA's container is a heavy ask — the user
13 explicitly told us "Docker is a new system on the user machine and
14 we cannot rely on that".
16 This supervisor replaces the WAHA-Docker path with an embedded
17 Baileys gateway (``integrations/channels/whatsapp/gateway.js``)
18 that exposes the same WAHA API subset on the same port, so the
19 adapter stays identical. The only "new system" is Node.js — but
20 Nunba's web build already requires Node, so most user machines
21 already have it. When Node is missing we surface a clear single-
22 line error and the adapter falls back to the existing
23 ``WHATSAPP_API_URL`` override path (operator-managed remote WAHA).
25Singleton: one instance per HARTOS process. Lifecycle owned by
26``start_supervisor()``, called from ``hartos_bootstrap.py``.
27"""
29from __future__ import annotations
31import json
32import logging
33import os
34import shutil
35import socket
36import subprocess
37import threading
38import time
39from pathlib import Path
40from typing import Any, Dict, Optional
42logger = logging.getLogger(__name__)
45# ── Filesystem layout (mirrors livekit_supervisor) ───────────────────
46def _hevolve_home() -> Path:
47 base = os.environ.get('HEVOLVE_HOME')
48 if base:
49 return Path(base).expanduser()
50 return Path.home() / '.hevolve'
53def _whatsapp_home() -> Path:
54 return _hevolve_home() / 'whatsapp'
57def _gateway_dir() -> Path:
58 """Repo path that holds gateway.js + package.json — installed-once
59 during ensure_baileys_deps()."""
60 return Path(__file__).resolve().parent.parent / 'channels' / 'whatsapp'
63# ── Deploy-mode gating (mirrors livekit_supervisor pattern) ──────────
64def _deploy_mode() -> str:
65 return os.environ.get('HEVOLVE_DEPLOY_MODE', 'flat').lower().strip()
68def supervisor_should_run() -> bool:
69 """True iff this process should host the Baileys gateway itself.
71 Override:
72 WHATSAPP_AUTOSTART=0 → force-disable (operator runs WAHA elsewhere)
73 WHATSAPP_AUTOSTART=1 → force-enable (regardless of deploy mode)
74 WHATSAPP_API_URL set to a non-localhost address → operator
75 explicitly wants remote WAHA; we skip spawning the local
76 gateway so the adapter's existing override path takes effect.
77 """
78 forced = os.environ.get('WHATSAPP_AUTOSTART')
79 if forced == '0':
80 return False
81 if forced == '1':
82 return True
84 # Operator points at a remote WAHA — let them. Single source of
85 # truth for "use external gateway" is the adapter's WHATSAPP_API_URL
86 # env (already honoured at adapter init).
87 api_url = os.environ.get('WHATSAPP_API_URL', '').strip().lower()
88 if api_url and not (
89 'localhost' in api_url or '127.0.0.1' in api_url
90 ):
91 return False
93 return _deploy_mode() in ('flat', 'regional')
96# ── Port helpers (lifted from livekit_supervisor.py — same idiom) ────
97def _port_in_use(port: int) -> bool:
98 """True iff *something* is already listening on 127.0.0.1:port.
100 When True, we skip the Baileys spawn and assume an operator has
101 WAHA running there manually — same short-circuit livekit_supervisor
102 uses. Single check, single rule.
103 """
104 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
105 sock.settimeout(0.5)
106 try:
107 return sock.connect_ex(('127.0.0.1', port)) == 0
108 finally:
109 try:
110 sock.close()
111 except OSError:
112 pass
115def _gateway_port() -> int:
116 return int(os.environ.get('WHATSAPP_GATEWAY_PORT', '3000'))
119# ── Node + dep ensures (one-time on first start) ─────────────────────
120def ensure_node() -> Optional[str]:
121 """Return path to ``node`` executable, or None if Node isn't on
122 PATH. Caller surfaces a "please install Node" message to the user
123 when this returns None."""
124 return shutil.which('node')
127def ensure_npm() -> Optional[str]:
128 """Return path to ``npm``, or None. Same docs as ensure_node."""
129 return shutil.which('npm')
132def ensure_baileys_deps() -> bool:
133 """Idempotent: install gateway.js's npm deps under the gateway dir
134 if its ``node_modules/`` doesn't yet exist. Returns True iff the
135 deps are now ready (cached or freshly installed). False on
136 install failure — caller logs and falls back."""
137 gateway_dir = _gateway_dir()
138 if not gateway_dir.exists():
139 logger.error(
140 "whatsapp_supervisor: gateway dir missing at %s", gateway_dir)
141 return False
143 node_modules = gateway_dir / 'node_modules'
144 if node_modules.exists() and node_modules.is_dir():
145 return True
147 npm = ensure_npm()
148 if npm is None:
149 logger.warning(
150 "whatsapp_supervisor: npm not on PATH; cannot install Baileys "
151 "deps. Set WHATSAPP_API_URL to a managed WAHA endpoint, or "
152 "install Node.js (which provides npm).")
153 return False
155 # First-run install — pulls @whiskeysockets/baileys + express.
156 # Hidden cmd window on Windows.
157 from core.subprocess_safe import hidden_popen_kwargs
158 logger.info(
159 "whatsapp_supervisor: installing Baileys deps in %s "
160 "(one-time, ~80 MB)…", gateway_dir)
161 try:
162 result = subprocess.run(
163 [npm, 'install', '--no-audit', '--no-fund',
164 '--loglevel=error', '--prefer-offline'],
165 cwd=str(gateway_dir),
166 stdout=subprocess.PIPE,
167 stderr=subprocess.STDOUT,
168 text=True,
169 timeout=180,
170 **hidden_popen_kwargs(),
171 )
172 except (subprocess.TimeoutExpired, OSError) as e:
173 logger.warning("whatsapp_supervisor: npm install failed (%s)", e)
174 return False
175 if result.returncode != 0:
176 logger.warning(
177 "whatsapp_supervisor: npm install returned %d:\n%s",
178 result.returncode, (result.stdout or '')[:500])
179 return False
180 logger.info("whatsapp_supervisor: Baileys deps installed")
181 return True
184# ── Supervisor (mirrors livekit_supervisor._Supervisor) ──────────────
185class _Supervisor:
186 """One per process. Daemon thread → exits cleanly with parent."""
188 def __init__(self) -> None:
189 self.proc: Optional[subprocess.Popen] = None
190 self.thread: Optional[threading.Thread] = None
191 self.stop_event = threading.Event()
192 self.last_error: Optional[str] = None
193 self.last_started: Optional[float] = None
194 self.restart_count = 0
195 self.lock = threading.Lock()
197 def info(self) -> Dict[str, Any]:
198 running = bool(self.proc and self.proc.poll() is None)
199 return {
200 'running': running,
201 'port': _gateway_port(),
202 'pid': self.proc.pid if running and self.proc else None,
203 'last_error': self.last_error,
204 'last_started': self.last_started,
205 'restart_count': self.restart_count,
206 }
208 def start(self) -> Dict[str, Any]:
209 if self.thread is not None and self.thread.is_alive():
210 return self.info()
212 node = ensure_node()
213 if node is None:
214 self.last_error = (
215 'Node.js not on PATH; install Node ≥18 or set '
216 'WHATSAPP_API_URL to a remote WAHA endpoint')
217 logger.warning("whatsapp_supervisor: %s", self.last_error)
218 return self.info()
220 if not ensure_baileys_deps():
221 self.last_error = (
222 'Baileys npm install failed — see prior log lines')
223 return self.info()
225 port = _gateway_port()
226 if _port_in_use(port):
227 self.last_error = (
228 f'port {port} already in use; assuming operator-managed '
229 f'WhatsApp gateway is running and skipping spawn')
230 logger.info("whatsapp_supervisor: %s", self.last_error)
231 return self.info()
233 self.thread = threading.Thread(
234 target=self._run, daemon=True, name='whatsapp-supervisor')
235 self.thread.start()
236 return self.info()
238 def stop(self) -> None:
239 self.stop_event.set()
240 with self.lock:
241 if self.proc and self.proc.poll() is None:
242 try:
243 self.proc.terminate()
244 try:
245 self.proc.wait(timeout=5)
246 except subprocess.TimeoutExpired:
247 self.proc.kill()
248 except OSError:
249 pass
251 def _run(self) -> None:
252 from core.subprocess_safe import hidden_popen_kwargs
253 node = ensure_node() or 'node'
254 gateway_js = _gateway_dir() / 'gateway.js'
255 env = os.environ.copy()
256 env['WHATSAPP_GATEWAY_PORT'] = str(_gateway_port())
257 env['HEVOLVE_HOME'] = str(_hevolve_home())
259 backoff = 1.0
260 while not self.stop_event.is_set():
261 try:
262 cmd = [node, str(gateway_js)]
263 logger.info(
264 "whatsapp_supervisor: spawning %s on port %d",
265 ' '.join(cmd), _gateway_port())
266 with self.lock:
267 self.proc = subprocess.Popen(
268 cmd,
269 stdout=subprocess.PIPE,
270 stderr=subprocess.STDOUT,
271 cwd=str(_gateway_dir()),
272 env=env,
273 text=True,
274 bufsize=1, # line-buffered for line-by-line drain
275 **hidden_popen_kwargs(),
276 )
277 self.last_started = time.time()
278 self.restart_count += 1
280 # Drain stdout into logger so operators see what
281 # Baileys is doing without a separate log file. Lines
282 # are JSON when emitted by gateway.js (matches
283 # livekit_supervisor's stdout-streaming idiom).
284 if self.proc.stdout is not None:
285 for raw in self.proc.stdout:
286 if self.stop_event.is_set():
287 break
288 line = raw.rstrip()
289 if not line:
290 continue
291 # Try JSON event; fall back to plain text.
292 try:
293 payload = json.loads(line)
294 event = payload.get('event') or 'log'
295 logger.info(
296 "whatsapp_supervisor: %s %s",
297 event,
298 {k: v for k, v in payload.items()
299 if k != 'event'},
300 )
301 except (ValueError, AttributeError):
302 logger.info("whatsapp_supervisor: %s", line)
304 rc = self.proc.wait() if self.proc else None
305 self.last_error = f'gateway exited rc={rc}'
306 if self.stop_event.is_set():
307 break
308 logger.warning(
309 "whatsapp_supervisor: gateway exited rc=%s; "
310 "respawning in %.1fs", rc, backoff)
311 time.sleep(backoff)
312 backoff = min(backoff * 2, 60.0)
313 except FileNotFoundError as e:
314 self.last_error = f'node not found: {e}'
315 logger.error("whatsapp_supervisor: %s", self.last_error)
316 break
317 except Exception as e: # noqa: BLE001 — supervisor catches all
318 self.last_error = f'spawn error: {e}'
319 logger.exception("whatsapp_supervisor: spawn error")
320 if self.stop_event.is_set():
321 break
322 time.sleep(backoff)
323 backoff = min(backoff * 2, 60.0)
326_supervisor: Optional[_Supervisor] = None
327_supervisor_lock = threading.Lock()
330def start_supervisor() -> Dict[str, Any]:
331 """Idempotent entrypoint called from hartos_bootstrap.py. No-op
332 when supervisor_should_run() is False (central deploy, operator-
333 managed remote WAHA, etc.)."""
334 if not supervisor_should_run():
335 logger.info(
336 "whatsapp_supervisor: skipped (deploy_mode=%s, autostart=%s, "
337 "WHATSAPP_API_URL=%s)",
338 _deploy_mode(),
339 os.environ.get('WHATSAPP_AUTOSTART'),
340 os.environ.get('WHATSAPP_API_URL'),
341 )
342 return {'running': False, 'reason': 'gated_off'}
344 global _supervisor
345 with _supervisor_lock:
346 if _supervisor is None:
347 _supervisor = _Supervisor()
348 return _supervisor.start()
351def stop_supervisor() -> None:
352 """Best-effort shutdown — used by tests + bootstrap teardown."""
353 global _supervisor
354 with _supervisor_lock:
355 if _supervisor is not None:
356 _supervisor.stop()
359def info() -> Dict[str, Any]:
360 """Reflection for /health endpoints + agent diagnostics."""
361 global _supervisor
362 if _supervisor is None:
363 return {'running': False, 'reason': 'not_started'}
364 return _supervisor.info()