Coverage for integrations / social / api_calls.py: 35.0%
160 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — REST blueprint for voice/video/screen-share calls.
4Phase 7d. Plan reference: sunny-gliding-eich.md, Part E.4.
6Endpoints (all flag-gated by `calls_v1` server-side; off → 503 via
7the requires_flag decorator pattern from api.py):
9 POST /api/social/calls start (parent_kind, parent_id, kind)
10 GET /api/social/calls/<id> current state + participants
11 POST /api/social/calls/<id>/token issue LiveKit token (per participant)
12 POST /api/social/calls/<id>/join mark participant joined
13 POST /api/social/calls/<id>/leave leave call
14 POST /api/social/calls/<id>/end owner / parent admin only
15 GET /api/social/calls/<id>/participants current roster
16 POST /api/social/calls/<id>/agents add agent participant (requires AgentJoinGrant)
17 POST /api/social/calls/<id>/agents/<aid>/control mute/unmute the agent's mic
18 POST /api/social/agent-grants owner grants an agent join permission
19 DELETE /api/social/agent-grants/<id> owner revokes a grant
20"""
22from __future__ import annotations
24import logging
25import os
27from flask import Blueprint, request, g, jsonify
29from .auth import require_auth
30from .api import requires_flag # canonical flag-gate decorator
31from .call_service import CallService, CallError, ALLOWED_CALL_KINDS
32from .livekit_service import LiveKitService
33# Single canonical source of per-kind mesh thresholds. The bandwidth
34# model module documents the math (mesh vs SFU per-peer cost crossover
35# under residential uplink ceilings) AND owns the dict; we import it
36# here under a local alias so existing internal references keep working
37# without forcing every caller through the model module's namespace.
38from ._mesh_bandwidth_model import (
39 OPERATIONAL_THRESHOLDS as _DEFAULT_KIND_THRESHOLDS,
40)
42logger = logging.getLogger('hevolve_social')
44calls_bp = Blueprint('calls', __name__, url_prefix='/api/social')
47def _ok(data, status=200):
48 return jsonify({'success': True, 'data': data}), status
51def _err(message, status=400):
52 return jsonify({'success': False, 'error': message}), status
55def _get_json():
56 return request.get_json(force=True, silent=True) or {}
59# ── Call lifecycle ───────────────────────────────────────────────────
61@calls_bp.route('/calls', methods=['POST'])
62@require_auth
63@requires_flag('calls_v1')
64def start_call():
65 data = _get_json()
66 parent_kind = data.get('parent_kind')
67 parent_id = data.get('parent_id')
68 kind = data.get('kind') or 'voice'
69 if not parent_kind or not parent_id:
70 return _err('parent_kind and parent_id required')
71 if kind not in ALLOWED_CALL_KINDS:
72 return _err(f"unsupported call kind: {kind}")
73 try:
74 sess = CallService.create(
75 g.db, parent_kind, parent_id, g.user.id,
76 kind=kind, title=data.get('title'),
77 settings=data.get('settings'),
78 tenant_id=getattr(g, 'tenant_id', None))
79 except CallError as e:
80 # 'parent not found' shape mirrors tenant isolation: 404 not 403.
81 if 'not found' in str(e):
82 return _err(str(e), 404)
83 return _err(str(e), 400)
84 return _ok(sess, status=201)
87@calls_bp.route('/calls/<call_id>', methods=['GET'])
88@require_auth
89@requires_flag('calls_v1')
90def get_call(call_id):
91 try:
92 sess = CallService.get(g.db, call_id)
93 except CallError:
94 return _err('not found', 404)
95 sess['participants'] = CallService.list_participants(g.db, call_id)
96 return _ok(sess)
99# ── Mesh-first media-mode decision (PeerLink ↔ LiveKit complement) ──
100# By design HARTOS keeps voice/video on a P2P WebRTC mesh signaled
101# over PeerLink DISPATCH for small calls (cheap, low-latency, no SFU
102# round-trip). LiveKit only takes over when:
103# - participant count exceeds the mesh threshold (default 4 — N×N
104# mesh fanout becomes inefficient past that), OR
105# - any participant is an agent_bridge (the bridge needs a stable
106# SFU rendezvous URL to publish TTS / subscribe STT), OR
107# - the call kind is 'screen_share' or 'mixed' (multi-track → SFU
108# simpler than re-negotiating mesh tracks).
109# Override via `LIVEKIT_MESH_THRESHOLD` (single int, applies to all
110# kinds) OR per-kind: `LIVEKIT_MESH_THRESHOLD_VOICE`,
111# `LIVEKIT_MESH_THRESHOLD_VIDEO`. Set to 0 to always promote; 999 to
112# disable promotion entirely for testing.
113#
114# The actual per-kind defaults live in _mesh_bandwidth_model.py
115# (imported above as _DEFAULT_KIND_THRESHOLDS). Voice (audio-only,
116# 32 kbps Opus) stays cheap on mesh through N=4; video (500 kbps VP8)
117# crosses the per-peer upload pain point at N=3; screen_share / mixed
118# always go SFU. See that module for the full bandwidth math.
121def _mesh_threshold(kind: str = 'voice') -> int:
122 """Resolve the mesh threshold for a given call kind.
124 Resolution order (highest priority first):
125 1. `LIVEKIT_MESH_THRESHOLD_<KIND>` env (per-kind override)
126 2. `LIVEKIT_MESH_THRESHOLD` env (uniform override across kinds)
127 3. _DEFAULT_KIND_THRESHOLDS[kind] (bandwidth-model default)
128 4. 4 (conservative fallback for unknown kinds)
129 """
130 default = _DEFAULT_KIND_THRESHOLDS.get(kind, 4)
131 val = (os.environ.get(f'LIVEKIT_MESH_THRESHOLD_{kind.upper()}')
132 or os.environ.get('LIVEKIT_MESH_THRESHOLD'))
133 if val is None:
134 return max(1, default)
135 try:
136 return max(1, int(val))
137 except (TypeError, ValueError):
138 return default
141def _decide_media_mode(sess, participants, *, is_agent: bool) -> str:
142 """Return either 'p2p_mesh' or 'livekit'.
144 The endpoint uses this to choose between a LiveKit token (forwarded
145 to LiveKitService) or a {mode: 'p2p_mesh', signaling: 'peerlink'}
146 response telling the client to do its own WebRTC handshake over
147 the existing PeerLink DISPATCH channel.
148 """
149 if is_agent:
150 return 'livekit'
151 kind = (sess or {}).get('kind') or 'voice'
152 if kind in ('screen_share', 'mixed'):
153 return 'livekit'
154 # Count *active* participants — left_at IS NULL.
155 active = [p for p in (participants or []) if not p.get('left_at')]
156 # +1 for the caller about to join, if not already in the list.
157 caller_id = getattr(g.user, 'id', None)
158 if caller_id and not any(p.get('user_id') == caller_id for p in active):
159 active_count = len(active) + 1
160 else:
161 active_count = len(active)
162 return 'livekit' if active_count > _mesh_threshold(kind) else 'p2p_mesh'
165@calls_bp.route('/calls/<call_id>/token', methods=['POST'])
166@require_auth
167@requires_flag('calls_v1')
168def issue_token(call_id):
169 try:
170 sess = CallService.get(g.db, call_id)
171 except CallError:
172 return _err('not found', 404)
173 if sess.get('ended_at'):
174 return _err('call has ended', 410)
175 data = _get_json()
176 is_agent = bool(data.get('is_agent', False))
177 participants = CallService.list_participants(g.db, call_id)
179 # Mesh-first: keep small calls on PeerLink-signaled WebRTC mesh.
180 # The client receives {mode: 'p2p_mesh'} and uses its existing
181 # PeerLink DISPATCH-channel signaling path — no LiveKit involved.
182 mode = _decide_media_mode(sess, participants, is_agent=is_agent)
183 if mode == 'p2p_mesh':
184 kind = (sess or {}).get('kind') or 'voice'
185 return _ok({
186 'mode': 'p2p_mesh',
187 'call_id': call_id,
188 'signaling': 'peerlink',
189 'participant_count': len(
190 [p for p in participants if not p.get('left_at')]),
191 'mesh_threshold': _mesh_threshold(kind),
192 })
194 token = LiveKitService.issue_token(
195 call_id, g.user.id,
196 can_publish=bool(data.get('can_publish', True)),
197 can_publish_screen=bool(data.get('can_publish_screen', False)),
198 is_agent=is_agent,
199 agent_bridge_node_id=data.get('agent_bridge_node_id'))
200 return _ok(token)
203@calls_bp.route('/calls/<call_id>/join', methods=['POST'])
204@require_auth
205@requires_flag('calls_v1')
206def join_call(call_id):
207 data = _get_json()
208 try:
209 p = CallService.join(
210 g.db, call_id, g.user.id,
211 device_kind=data.get('device_kind') or 'mobile',
212 tenant_id=getattr(g, 'tenant_id', None))
213 except CallError as e:
214 return _err(str(e), 404 if 'not found' in str(e).lower() else 400)
215 return _ok(p)
218@calls_bp.route('/calls/<call_id>/leave', methods=['POST'])
219@require_auth
220@requires_flag('calls_v1')
221def leave_call(call_id):
222 left = CallService.leave(g.db, call_id, g.user.id)
223 return _ok({'left': left})
226@calls_bp.route('/calls/<call_id>/end', methods=['POST'])
227@require_auth
228@requires_flag('calls_v1')
229def end_call(call_id):
230 try:
231 sess = CallService.end(g.db, call_id, g.user.id)
232 except CallError as e:
233 if 'admin' in str(e) or 'starter' in str(e):
234 return _err(str(e), 403)
235 return _err(str(e), 404 if 'not found' in str(e) else 400)
236 return _ok(sess)
239@calls_bp.route('/calls/<call_id>/participants', methods=['GET'])
240@require_auth
241@requires_flag('calls_v1', else_value=[])
242def list_participants(call_id):
243 include_left = request.args.get('include_left') == 'true'
244 rows = CallService.list_participants(
245 g.db, call_id, include_left=include_left)
246 return _ok(rows)
249# ── Agent join grant ─────────────────────────────────────────────────
251@calls_bp.route('/agent-grants', methods=['POST'])
252@require_auth
253@requires_flag('calls_v1')
254def create_agent_grant():
255 data = _get_json()
256 agent_id = data.get('agent_id')
257 parent_kind = data.get('parent_kind')
258 parent_id = data.get('parent_id')
259 scope = data.get('scope') or {}
260 if not agent_id or not parent_kind or not parent_id:
261 return _err('agent_id, parent_kind, parent_id required')
262 try:
263 grant = CallService.grant_agent(
264 g.db, agent_id, g.user.id, parent_kind, parent_id, scope,
265 tenant_id=getattr(g, 'tenant_id', None))
266 except CallError as e:
267 return _err(str(e), 400)
268 return _ok(grant, status=201)
271@calls_bp.route('/agent-grants/<grant_id>', methods=['DELETE'])
272@require_auth
273@requires_flag('calls_v1')
274def revoke_agent_grant(grant_id):
275 try:
276 revoked = CallService.revoke_agent(g.db, grant_id, g.user.id)
277 except CallError as e:
278 if 'granter' in str(e):
279 return _err(str(e), 403)
280 return _err(str(e), 404 if 'not found' in str(e) else 400)
281 return _ok({'revoked': revoked})
284@calls_bp.route('/calls/<call_id>/agents', methods=['POST'])
285@require_auth
286@requires_flag('calls_v1')
287def add_agent_to_call(call_id):
288 data = _get_json()
289 agent_id = data.get('agent_id')
290 if not agent_id:
291 return _err('agent_id required')
292 try:
293 p = CallService.attach_agent(
294 g.db, call_id, agent_id,
295 tenant_id=getattr(g, 'tenant_id', None))
296 except CallError as e:
297 return _err(str(e), 400)
298 return _ok(p, status=201)