Coverage for integrations / social / api_calls.py: 35.0%

160 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — REST blueprint for voice/video/screen-share calls. 

3 

4Phase 7d. Plan reference: sunny-gliding-eich.md, Part E.4. 

5 

6Endpoints (all flag-gated by `calls_v1` server-side; off → 503 via 

7the requires_flag decorator pattern from api.py): 

8 

9 POST /api/social/calls start (parent_kind, parent_id, kind) 

10 GET /api/social/calls/<id> current state + participants 

11 POST /api/social/calls/<id>/token issue LiveKit token (per participant) 

12 POST /api/social/calls/<id>/join mark participant joined 

13 POST /api/social/calls/<id>/leave leave call 

14 POST /api/social/calls/<id>/end owner / parent admin only 

15 GET /api/social/calls/<id>/participants current roster 

16 POST /api/social/calls/<id>/agents add agent participant (requires AgentJoinGrant) 

17 POST /api/social/calls/<id>/agents/<aid>/control mute/unmute the agent's mic 

18 POST /api/social/agent-grants owner grants an agent join permission 

19 DELETE /api/social/agent-grants/<id> owner revokes a grant 

20""" 

21 

22from __future__ import annotations 

23 

24import logging 

25import os 

26 

27from flask import Blueprint, request, g, jsonify 

28 

29from .auth import require_auth 

30from .api import requires_flag # canonical flag-gate decorator 

31from .call_service import CallService, CallError, ALLOWED_CALL_KINDS 

32from .livekit_service import LiveKitService 

33# Single canonical source of per-kind mesh thresholds. The bandwidth 

34# model module documents the math (mesh vs SFU per-peer cost crossover 

35# under residential uplink ceilings) AND owns the dict; we import it 

36# here under a local alias so existing internal references keep working 

37# without forcing every caller through the model module's namespace. 

38from ._mesh_bandwidth_model import ( 

39 OPERATIONAL_THRESHOLDS as _DEFAULT_KIND_THRESHOLDS, 

40) 

41 

42logger = logging.getLogger('hevolve_social') 

43 

44calls_bp = Blueprint('calls', __name__, url_prefix='/api/social') 

45 

46 

47def _ok(data, status=200): 

48 return jsonify({'success': True, 'data': data}), status 

49 

50 

51def _err(message, status=400): 

52 return jsonify({'success': False, 'error': message}), status 

53 

54 

55def _get_json(): 

56 return request.get_json(force=True, silent=True) or {} 

57 

58 

59# ── Call lifecycle ─────────────────────────────────────────────────── 

60 

61@calls_bp.route('/calls', methods=['POST']) 

62@require_auth 

63@requires_flag('calls_v1') 

64def start_call(): 

65 data = _get_json() 

66 parent_kind = data.get('parent_kind') 

67 parent_id = data.get('parent_id') 

68 kind = data.get('kind') or 'voice' 

69 if not parent_kind or not parent_id: 

70 return _err('parent_kind and parent_id required') 

71 if kind not in ALLOWED_CALL_KINDS: 

72 return _err(f"unsupported call kind: {kind}") 

73 try: 

74 sess = CallService.create( 

75 g.db, parent_kind, parent_id, g.user.id, 

76 kind=kind, title=data.get('title'), 

77 settings=data.get('settings'), 

78 tenant_id=getattr(g, 'tenant_id', None)) 

79 except CallError as e: 

80 # 'parent not found' shape mirrors tenant isolation: 404 not 403. 

81 if 'not found' in str(e): 

82 return _err(str(e), 404) 

83 return _err(str(e), 400) 

84 return _ok(sess, status=201) 

85 

86 

87@calls_bp.route('/calls/<call_id>', methods=['GET']) 

88@require_auth 

89@requires_flag('calls_v1') 

90def get_call(call_id): 

91 try: 

92 sess = CallService.get(g.db, call_id) 

93 except CallError: 

94 return _err('not found', 404) 

95 sess['participants'] = CallService.list_participants(g.db, call_id) 

96 return _ok(sess) 

97 

98 

99# ── Mesh-first media-mode decision (PeerLink ↔ LiveKit complement) ── 

100# By design HARTOS keeps voice/video on a P2P WebRTC mesh signaled 

101# over PeerLink DISPATCH for small calls (cheap, low-latency, no SFU 

102# round-trip). LiveKit only takes over when: 

103# - participant count exceeds the mesh threshold (default 4 — N×N 

104# mesh fanout becomes inefficient past that), OR 

105# - any participant is an agent_bridge (the bridge needs a stable 

106# SFU rendezvous URL to publish TTS / subscribe STT), OR 

107# - the call kind is 'screen_share' or 'mixed' (multi-track → SFU 

108# simpler than re-negotiating mesh tracks). 

109# Override via `LIVEKIT_MESH_THRESHOLD` (single int, applies to all 

110# kinds) OR per-kind: `LIVEKIT_MESH_THRESHOLD_VOICE`, 

111# `LIVEKIT_MESH_THRESHOLD_VIDEO`. Set to 0 to always promote; 999 to 

112# disable promotion entirely for testing. 

113# 

114# The actual per-kind defaults live in _mesh_bandwidth_model.py 

115# (imported above as _DEFAULT_KIND_THRESHOLDS). Voice (audio-only, 

116# 32 kbps Opus) stays cheap on mesh through N=4; video (500 kbps VP8) 

117# crosses the per-peer upload pain point at N=3; screen_share / mixed 

118# always go SFU. See that module for the full bandwidth math. 

119 

120 

121def _mesh_threshold(kind: str = 'voice') -> int: 

122 """Resolve the mesh threshold for a given call kind. 

123 

124 Resolution order (highest priority first): 

125 1. `LIVEKIT_MESH_THRESHOLD_<KIND>` env (per-kind override) 

126 2. `LIVEKIT_MESH_THRESHOLD` env (uniform override across kinds) 

127 3. _DEFAULT_KIND_THRESHOLDS[kind] (bandwidth-model default) 

128 4. 4 (conservative fallback for unknown kinds) 

129 """ 

130 default = _DEFAULT_KIND_THRESHOLDS.get(kind, 4) 

131 val = (os.environ.get(f'LIVEKIT_MESH_THRESHOLD_{kind.upper()}') 

132 or os.environ.get('LIVEKIT_MESH_THRESHOLD')) 

133 if val is None: 

134 return max(1, default) 

135 try: 

136 return max(1, int(val)) 

137 except (TypeError, ValueError): 

138 return default 

139 

140 

141def _decide_media_mode(sess, participants, *, is_agent: bool) -> str: 

142 """Return either 'p2p_mesh' or 'livekit'. 

143 

144 The endpoint uses this to choose between a LiveKit token (forwarded 

145 to LiveKitService) or a {mode: 'p2p_mesh', signaling: 'peerlink'} 

146 response telling the client to do its own WebRTC handshake over 

147 the existing PeerLink DISPATCH channel. 

148 """ 

149 if is_agent: 

150 return 'livekit' 

151 kind = (sess or {}).get('kind') or 'voice' 

152 if kind in ('screen_share', 'mixed'): 

153 return 'livekit' 

154 # Count *active* participants — left_at IS NULL. 

155 active = [p for p in (participants or []) if not p.get('left_at')] 

156 # +1 for the caller about to join, if not already in the list. 

157 caller_id = getattr(g.user, 'id', None) 

158 if caller_id and not any(p.get('user_id') == caller_id for p in active): 

159 active_count = len(active) + 1 

160 else: 

161 active_count = len(active) 

162 return 'livekit' if active_count > _mesh_threshold(kind) else 'p2p_mesh' 

163 

164 

165@calls_bp.route('/calls/<call_id>/token', methods=['POST']) 

166@require_auth 

167@requires_flag('calls_v1') 

168def issue_token(call_id): 

169 try: 

170 sess = CallService.get(g.db, call_id) 

171 except CallError: 

172 return _err('not found', 404) 

173 if sess.get('ended_at'): 

174 return _err('call has ended', 410) 

175 data = _get_json() 

176 is_agent = bool(data.get('is_agent', False)) 

177 participants = CallService.list_participants(g.db, call_id) 

178 

179 # Mesh-first: keep small calls on PeerLink-signaled WebRTC mesh. 

180 # The client receives {mode: 'p2p_mesh'} and uses its existing 

181 # PeerLink DISPATCH-channel signaling path — no LiveKit involved. 

182 mode = _decide_media_mode(sess, participants, is_agent=is_agent) 

183 if mode == 'p2p_mesh': 

184 kind = (sess or {}).get('kind') or 'voice' 

185 return _ok({ 

186 'mode': 'p2p_mesh', 

187 'call_id': call_id, 

188 'signaling': 'peerlink', 

189 'participant_count': len( 

190 [p for p in participants if not p.get('left_at')]), 

191 'mesh_threshold': _mesh_threshold(kind), 

192 }) 

193 

194 token = LiveKitService.issue_token( 

195 call_id, g.user.id, 

196 can_publish=bool(data.get('can_publish', True)), 

197 can_publish_screen=bool(data.get('can_publish_screen', False)), 

198 is_agent=is_agent, 

199 agent_bridge_node_id=data.get('agent_bridge_node_id')) 

200 return _ok(token) 

201 

202 

203@calls_bp.route('/calls/<call_id>/join', methods=['POST']) 

204@require_auth 

205@requires_flag('calls_v1') 

206def join_call(call_id): 

207 data = _get_json() 

208 try: 

209 p = CallService.join( 

210 g.db, call_id, g.user.id, 

211 device_kind=data.get('device_kind') or 'mobile', 

212 tenant_id=getattr(g, 'tenant_id', None)) 

213 except CallError as e: 

214 return _err(str(e), 404 if 'not found' in str(e).lower() else 400) 

215 return _ok(p) 

216 

217 

218@calls_bp.route('/calls/<call_id>/leave', methods=['POST']) 

219@require_auth 

220@requires_flag('calls_v1') 

221def leave_call(call_id): 

222 left = CallService.leave(g.db, call_id, g.user.id) 

223 return _ok({'left': left}) 

224 

225 

226@calls_bp.route('/calls/<call_id>/end', methods=['POST']) 

227@require_auth 

228@requires_flag('calls_v1') 

229def end_call(call_id): 

230 try: 

231 sess = CallService.end(g.db, call_id, g.user.id) 

232 except CallError as e: 

233 if 'admin' in str(e) or 'starter' in str(e): 

234 return _err(str(e), 403) 

235 return _err(str(e), 404 if 'not found' in str(e) else 400) 

236 return _ok(sess) 

237 

238 

239@calls_bp.route('/calls/<call_id>/participants', methods=['GET']) 

240@require_auth 

241@requires_flag('calls_v1', else_value=[]) 

242def list_participants(call_id): 

243 include_left = request.args.get('include_left') == 'true' 

244 rows = CallService.list_participants( 

245 g.db, call_id, include_left=include_left) 

246 return _ok(rows) 

247 

248 

249# ── Agent join grant ───────────────────────────────────────────────── 

250 

251@calls_bp.route('/agent-grants', methods=['POST']) 

252@require_auth 

253@requires_flag('calls_v1') 

254def create_agent_grant(): 

255 data = _get_json() 

256 agent_id = data.get('agent_id') 

257 parent_kind = data.get('parent_kind') 

258 parent_id = data.get('parent_id') 

259 scope = data.get('scope') or {} 

260 if not agent_id or not parent_kind or not parent_id: 

261 return _err('agent_id, parent_kind, parent_id required') 

262 try: 

263 grant = CallService.grant_agent( 

264 g.db, agent_id, g.user.id, parent_kind, parent_id, scope, 

265 tenant_id=getattr(g, 'tenant_id', None)) 

266 except CallError as e: 

267 return _err(str(e), 400) 

268 return _ok(grant, status=201) 

269 

270 

271@calls_bp.route('/agent-grants/<grant_id>', methods=['DELETE']) 

272@require_auth 

273@requires_flag('calls_v1') 

274def revoke_agent_grant(grant_id): 

275 try: 

276 revoked = CallService.revoke_agent(g.db, grant_id, g.user.id) 

277 except CallError as e: 

278 if 'granter' in str(e): 

279 return _err(str(e), 403) 

280 return _err(str(e), 404 if 'not found' in str(e) else 400) 

281 return _ok({'revoked': revoked}) 

282 

283 

284@calls_bp.route('/calls/<call_id>/agents', methods=['POST']) 

285@require_auth 

286@requires_flag('calls_v1') 

287def add_agent_to_call(call_id): 

288 data = _get_json() 

289 agent_id = data.get('agent_id') 

290 if not agent_id: 

291 return _err('agent_id required') 

292 try: 

293 p = CallService.attach_agent( 

294 g.db, call_id, agent_id, 

295 tenant_id=getattr(g, 'tenant_id', None)) 

296 except CallError as e: 

297 return _err(str(e), 400) 

298 return _ok(p, status=201)