Coverage for integrations / social / call_service.py: 24.7%

162 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — Voice/video/screen-share call service. 

3 

4Phase 7d. Plan reference: sunny-gliding-eich.md, Part E.4 + E.7 + E.12. 

5 

6This is the BACKEND state machine for call sessions — REST surface 

7for create / join / leave / end / invite, plus the AgentJoinGrant 

8gate for agents joining as participants. 

9 

10What lives here: 

11 - CallService.create / get / end — CallSession bookkeeping. 

12 - CallService.join / leave — CallParticipant lifecycle, with 

13 UNIQUE-WHERE-left_at-IS-NULL invariant ensuring exactly one 

14 active participant row per (call, user). 

15 - CallService.list_participants — current roster. 

16 - CallService.invite_member — Notification fan-out for non-muted 

17 members of the parent (community / conversation). 

18 - CallService.attach_agent — verifies AgentJoinGrant.can_voice 

19 before allowing the AgentVoiceBridge to spin up. 

20 

21What does NOT live here (deferred to LiveKit + AgentVoiceBridge): 

22 - Media frames (WebRTC P2P mesh / LiveKit SFU live in their own 

23 transports; this service only emits signaling state). 

24 - Audio mixing / TTS publish / Whisper STT for agents — the bridge 

25 is a separate per-call worker; this service just registers 

26 bookkeeping for it. 

27 

28Transport: P2P-first via core.peer_link.message_bus.MessageBus.publish. 

29Falls back to WAMP push for offline recipients and HTTP API for sync. 

30Central is the audit log + discovery index, not the primary router. 

31 

32Existing fan-out order (do not bypass): 

33 LOCAL → SSE → PEERLINK → CROSSBAR 

34 

35This module ONLY persists rows in the canonical DB and calls 

36MessageBus.publish() — it does NOT directly emit WAMP or open 

37PeerLink frames. Transport selection is owned by message_bus.py. 

38""" 

39 

40from __future__ import annotations 

41 

42import json 

43import logging 

44import uuid 

45from datetime import datetime 

46from typing import Any, Dict, List, Optional, Tuple 

47 

48from sqlalchemy import text 

49 

50logger = logging.getLogger('hevolve_social') 

51 

52 

53# Whitelist enforced at the service layer so a malicious client cannot 

54# coax the DB into accepting a kind not understood by clients. 

55ALLOWED_CALL_KINDS = ('voice', 'video', 'screen_share', 'mixed', 'stage') 

56 

57# Default per-room cap. Above 4 participants, mesh efficiency drops 

58# and LiveKit SFU is auto-promoted (Plan R.6). Hard cap configurable 

59# per-call via settings.max_participants. 

60DEFAULT_MAX_PARTICIPANTS = 32 

61 

62 

63class CallError(Exception): 

64 """Raised for service-level call failures. api_calls.py maps 

65 these to 4xx HTTP responses with the message in the error body.""" 

66 

67 

68class CallService: 

69 

70 # ── Session lifecycle ──────────────────────────────────────────── 

71 

72 @staticmethod 

73 def create(db, parent_kind: str, parent_id: str, started_by: str, 

74 kind: str = 'voice', title: Optional[str] = None, 

75 max_participants: int = DEFAULT_MAX_PARTICIPANTS, 

76 settings: Optional[Dict[str, Any]] = None, 

77 tenant_id: Optional[str] = None) -> Dict[str, Any]: 

78 """Create a CallSession row. 

79 

80 The starter must be a member of the parent (community or 

81 conversation) — gated by Membership lookup. Non-members 

82 raise CallError so we don't leak the existence of the parent. 

83 

84 Idempotency: a parent already has-an-active-call short-circuits 

85 and returns the existing row instead of creating a duplicate. 

86 Two clients racing on the Start Call button get the same call, 

87 which is what users expect. 

88 """ 

89 if kind not in ALLOWED_CALL_KINDS: 

90 raise CallError(f"unsupported call kind: {kind!r}") 

91 if parent_kind not in ('community', 'conversation'): 

92 raise CallError(f"unsupported parent_kind: {parent_kind!r}") 

93 

94 if not _is_parent_member(db, parent_kind, parent_id, started_by): 

95 # 404-shape from caller — do not reveal existence of the parent. 

96 raise CallError("parent not found") 

97 

98 # Idempotent: return existing active call for this parent. 

99 # Pass-4 P4-1 fix: SELECT-then-INSERT under READ COMMITTED is 

100 # racy on Postgres — two concurrent starters could both pass 

101 # the SELECT and both INSERT. Use a SAVEPOINT around the 

102 # INSERT so an IntegrityError on a future partial-unique- 

103 # index parent_active row falls back to re-SELECT. The 

104 # partial index isn't in the migration today (deferred to 

105 # Phase 7d.B alongside the MySQL workaround for #2), but the 

106 # try/except on insert + recheck closes the window in code. 

107 existing = db.execute(text( 

108 "SELECT id FROM call_sessions " 

109 "WHERE parent_kind = :pk AND parent_id = :pid " 

110 "AND ended_at IS NULL " 

111 "ORDER BY started_at DESC LIMIT 1"), 

112 {'pk': parent_kind, 'pid': parent_id} 

113 ).fetchone() 

114 if existing is not None: 

115 return CallService.get(db, existing[0]) 

116 

117 call_id = str(uuid.uuid4()) 

118 try: 

119 with db.begin_nested(): 

120 db.execute(text( 

121 "INSERT INTO call_sessions " 

122 "(id, tenant_id, parent_kind, parent_id, title, " 

123 " kind, started_by, max_participants, settings) " 

124 "VALUES " 

125 "(:id, :tid, :pk, :pid, :title, :kind, :sb, :mp, :s)"), 

126 {'id': call_id, 'tid': tenant_id, 'pk': parent_kind, 

127 'pid': parent_id, 'title': title, 'kind': kind, 

128 'sb': started_by, 'mp': max_participants, 

129 's': json.dumps(settings) if settings else None}) 

130 except Exception as e: 

131 # Concurrent starter beat us to the INSERT — re-SELECT 

132 # the now-active call and return it. Reviewer P4-1. 

133 logger.info( 

134 "CallService.create concurrent INSERT collision " 

135 "for parent=%s/%s: %s; re-reading", parent_kind, 

136 parent_id, e) 

137 existing = db.execute(text( 

138 "SELECT id FROM call_sessions " 

139 "WHERE parent_kind = :pk AND parent_id = :pid " 

140 "AND ended_at IS NULL " 

141 "ORDER BY started_at DESC LIMIT 1"), 

142 {'pk': parent_kind, 'pid': parent_id} 

143 ).fetchone() 

144 if existing is not None: 

145 return CallService.get(db, existing[0]) 

146 raise CallError(f"could not create call: {e}") 

147 db.commit() 

148 

149 # Auto-add the starter as the first participant so the roster 

150 # query reflects them immediately. 

151 CallService.join(db, call_id, started_by, 

152 device_kind='mobile', tenant_id=tenant_id) 

153 return CallService.get(db, call_id) 

154 

155 @staticmethod 

156 def get(db, call_id: str) -> Dict[str, Any]: 

157 row = db.execute(text( 

158 "SELECT id, parent_kind, parent_id, title, kind, started_by, " 

159 " started_at, ended_at, livekit_room_sid, " 

160 " max_participants, settings " 

161 "FROM call_sessions WHERE id = :id"), 

162 {'id': call_id} 

163 ).fetchone() 

164 if row is None: 

165 raise CallError("not found") 

166 return _row_to_dict(row) 

167 

168 @staticmethod 

169 def end(db, call_id: str, ended_by_id: str) -> Dict[str, Any]: 

170 """End the call. Only the starter or a parent admin can end. 

171 

172 Pass-4 P4-4: both UPDATEs run inside a SAVEPOINT so a mid- 

173 operation failure rolls back BOTH (no half-ended state where 

174 the session is closed but participants stay active). Order 

175 is participants → session so worst-case partial commit leaves 

176 a still-active session with no active participants — easy to 

177 re-end manually — rather than the inverse (closed session 

178 with ghost participants). 

179 """ 

180 sess = CallService.get(db, call_id) 

181 if sess.get('ended_at'): 

182 return sess # idempotent — already ended 

183 is_starter = sess['started_by'] == ended_by_id 

184 is_admin = _is_parent_admin( 

185 db, sess['parent_kind'], sess['parent_id'], ended_by_id) 

186 if not (is_starter or is_admin): 

187 raise CallError("only the starter or an admin can end the call") 

188 with db.begin_nested(): 

189 db.execute(text( 

190 "UPDATE call_participants SET left_at = CURRENT_TIMESTAMP " 

191 "WHERE call_id = :id AND left_at IS NULL"), 

192 {'id': call_id}) 

193 db.execute(text( 

194 "UPDATE call_sessions SET ended_at = CURRENT_TIMESTAMP " 

195 "WHERE id = :id AND ended_at IS NULL"), 

196 {'id': call_id}) 

197 db.commit() 

198 return CallService.get(db, call_id) 

199 

200 # ── Participant lifecycle ──────────────────────────────────────── 

201 

202 @staticmethod 

203 def join(db, call_id: str, user_id: str, 

204 device_kind: str = 'mobile', agent_kind: str = 'human', 

205 tenant_id: Optional[str] = None) -> Dict[str, Any]: 

206 """Mark a user as joined. Idempotent — re-joining an active 

207 participant returns the existing row (no duplicate insert).""" 

208 sess = CallService.get(db, call_id) 

209 if sess.get('ended_at'): 

210 raise CallError("call has ended") 

211 

212 # Membership check: only members of the parent can join. 

213 if not _is_parent_member(db, sess['parent_kind'], 

214 sess['parent_id'], user_id): 

215 raise CallError("not a member of this room's parent") 

216 

217 # Idempotency: existing active participant row? 

218 active = db.execute(text( 

219 "SELECT id FROM call_participants " 

220 "WHERE call_id = :cid AND user_id = :uid AND left_at IS NULL"), 

221 {'cid': call_id, 'uid': user_id} 

222 ).fetchone() 

223 if active is not None: 

224 return CallService._participant_dict(db, active[0]) 

225 

226 pid = str(uuid.uuid4()) 

227 db.execute(text( 

228 "INSERT INTO call_participants " 

229 "(id, tenant_id, call_id, user_id, agent_kind, " 

230 " device_kind) " 

231 "VALUES (:id, :tid, :cid, :uid, :ak, :dk)"), 

232 {'id': pid, 'tid': tenant_id, 'cid': call_id, 

233 'uid': user_id, 'ak': agent_kind, 'dk': device_kind}) 

234 db.commit() 

235 return CallService._participant_dict(db, pid) 

236 

237 @staticmethod 

238 def leave(db, call_id: str, user_id: str) -> bool: 

239 """Mark all this user's active participant rows as left. Idempotent.""" 

240 result = db.execute(text( 

241 "UPDATE call_participants SET left_at = CURRENT_TIMESTAMP " 

242 "WHERE call_id = :cid AND user_id = :uid AND left_at IS NULL"), 

243 {'cid': call_id, 'uid': user_id}) 

244 db.commit() 

245 return (result.rowcount or 0) > 0 

246 

247 @staticmethod 

248 def list_participants(db, call_id: str, 

249 include_left: bool = False) -> List[Dict[str, Any]]: 

250 clause = "" if include_left else " AND left_at IS NULL" 

251 rows = db.execute(text( 

252 "SELECT id, user_id, agent_kind, joined_at, left_at, " 

253 " is_muted, is_video_on, is_screen_sharing, device_kind " 

254 f"FROM call_participants WHERE call_id = :cid{clause} " 

255 "ORDER BY joined_at ASC"), 

256 {'cid': call_id} 

257 ).fetchall() 

258 return [ 

259 {'id': r[0], 'user_id': r[1], 'agent_kind': r[2], 

260 'joined_at': str(r[3]) if r[3] else None, 

261 'left_at': str(r[4]) if r[4] else None, 

262 'is_muted': bool(r[5]), 'is_video_on': bool(r[6]), 

263 'is_screen_sharing': bool(r[7]), 'device_kind': r[8]} 

264 for r in rows 

265 ] 

266 

267 # ── Agent join grant ──────────────────────────────────────────── 

268 

269 @staticmethod 

270 def grant_agent(db, agent_id: str, owner_id: str, 

271 parent_kind: str, parent_id: str, 

272 scope: Dict[str, Any], 

273 source: str = 'user_explicit', 

274 tenant_id: Optional[str] = None) -> Dict[str, Any]: 

275 """Owner grants an agent permission to join a parent. 

276 

277 Idempotent on (agent_id, parent_kind, parent_id) — a re-grant 

278 with the same triple updates the scope rather than inserting 

279 a duplicate. This matches what the user expects when they 

280 toggle scopes in the agent settings UI. 

281 """ 

282 # Verify the caller owns the agent — the agent's User row must 

283 # have user_type='agent' AND owner_id matching owner_id arg. 

284 # (SocialUser uses `owner_id` for the agent→owner link; 

285 # Plan C.3 calls it agent_owner_id but the live schema is 

286 # owner_id. Same semantic.) 

287 # Pass-4 P4-3: also pull is_admin so we can gate ownerless 

288 # (system) agent grants — see below. 

289 agent_row = db.execute(text( 

290 "SELECT user_type, owner_id FROM users WHERE id = :id"), 

291 {'id': agent_id} 

292 ).fetchone() 

293 if not agent_row: 

294 raise CallError("agent not found") 

295 if agent_row[0] != 'agent': 

296 raise CallError("user is not an agent") 

297 agent_owner = agent_row[1] 

298 if agent_owner is None: 

299 # Pass-4 P4-3 fix: system / ownerless agents must NOT be 

300 # granted by an arbitrary user. Previously the 

301 # `agent_row[1] is not None` short-circuit allowed any 

302 # authenticated user to issue can_voice / can_screen 

303 # grants for a system agent — security hole. Require 

304 # caller to be a platform admin OR raise. 

305 caller_row = db.execute(text( 

306 "SELECT is_admin FROM users WHERE id = :id"), 

307 {'id': owner_id} 

308 ).fetchone() 

309 if not caller_row or not caller_row[0]: 

310 raise CallError( 

311 "system agent grants require platform admin") 

312 elif agent_owner != owner_id: 

313 raise CallError("only the agent's owner can grant join") 

314 

315 # Existing active grant? Update scope in place. 

316 existing = db.execute(text( 

317 "SELECT id FROM agent_join_grants " 

318 "WHERE agent_id = :aid AND parent_kind = :pk " 

319 "AND parent_id = :pid AND revoked_at IS NULL"), 

320 {'aid': agent_id, 'pk': parent_kind, 'pid': parent_id} 

321 ).fetchone() 

322 scope_json = json.dumps(scope or {}) 

323 if existing is not None: 

324 db.execute(text( 

325 "UPDATE agent_join_grants SET scope = :s WHERE id = :id"), 

326 {'s': scope_json, 'id': existing[0]}) 

327 db.commit() 

328 return CallService._grant_dict(db, existing[0]) 

329 

330 gid = str(uuid.uuid4()) 

331 db.execute(text( 

332 "INSERT INTO agent_join_grants " 

333 "(id, tenant_id, agent_id, owner_id, parent_kind, " 

334 " parent_id, scope, source) " 

335 "VALUES (:id, :tid, :aid, :oid, :pk, :pid, :s, :src)"), 

336 {'id': gid, 'tid': tenant_id, 'aid': agent_id, 

337 'oid': owner_id, 'pk': parent_kind, 'pid': parent_id, 

338 's': scope_json, 'src': source}) 

339 db.commit() 

340 return CallService._grant_dict(db, gid) 

341 

342 @staticmethod 

343 def revoke_agent(db, grant_id: str, revoker_id: str) -> bool: 

344 """Owner revokes an active grant. Idempotent.""" 

345 row = db.execute(text( 

346 "SELECT owner_id, revoked_at FROM agent_join_grants " 

347 "WHERE id = :id"), 

348 {'id': grant_id} 

349 ).fetchone() 

350 if not row: 

351 raise CallError("grant not found") 

352 if row[0] != revoker_id: 

353 raise CallError("only the granter can revoke") 

354 if row[1] is not None: 

355 return False # already revoked 

356 db.execute(text( 

357 "UPDATE agent_join_grants SET revoked_at = CURRENT_TIMESTAMP " 

358 "WHERE id = :id"), 

359 {'id': grant_id}) 

360 db.commit() 

361 return True 

362 

363 @staticmethod 

364 def get_active_grant(db, agent_id: str, 

365 parent_kind: str, parent_id: str 

366 ) -> Optional[Dict[str, Any]]: 

367 row = db.execute(text( 

368 "SELECT id FROM agent_join_grants " 

369 "WHERE agent_id = :aid AND parent_kind = :pk " 

370 "AND parent_id = :pid AND revoked_at IS NULL " 

371 "ORDER BY granted_at DESC LIMIT 1"), 

372 {'aid': agent_id, 'pk': parent_kind, 'pid': parent_id} 

373 ).fetchone() 

374 if row is None: 

375 return None 

376 return CallService._grant_dict(db, row[0]) 

377 

378 @staticmethod 

379 def attach_agent(db, call_id: str, agent_id: str, 

380 tenant_id: Optional[str] = None) -> Dict[str, Any]: 

381 """Add an agent as a call_participant. Requires an active 

382 AgentJoinGrant on the parent with `can_voice = True` for 

383 kind='voice' / 'mixed' (or `can_screen` for screen_share). 

384 

385 Phase 7d.B: also spawns the AgentVoiceBridge worker so the 

386 agent actually gets STT + TTS plumbed through LiveKit (or 

387 no-op-stays-bookkeeping when livekit-rtc isn't installed). 

388 Plan E.12. Bridge spawn is best-effort — a failure to spin 

389 up the bridge does NOT fail the participant record (the 

390 roster row stays so admins can see the agent + retry attach). 

391 """ 

392 sess = CallService.get(db, call_id) 

393 grant = CallService.get_active_grant( 

394 db, agent_id, sess['parent_kind'], sess['parent_id']) 

395 if not grant: 

396 raise CallError("no active join grant for this agent + parent") 

397 scope = grant['scope'] or {} 

398 if sess['kind'] in ('voice', 'mixed') and not scope.get('can_voice'): 

399 raise CallError("grant does not include can_voice") 

400 if sess['kind'] == 'screen_share' and not scope.get('can_screen'): 

401 raise CallError("grant does not include can_screen") 

402 # Reuse join() with agent_kind='agent' + device_kind='agent_bridge' 

403 participant = CallService.join( 

404 db, call_id, agent_id, 

405 agent_kind='agent', device_kind='agent_bridge', 

406 tenant_id=tenant_id) 

407 # Phase 7d.B — spin up the AgentVoiceBridge worker. 

408 try: 

409 from .agent_voice_bridge import AgentVoiceBridge 

410 AgentVoiceBridge.attach_agent( 

411 db, call_id=call_id, agent_id=agent_id, 

412 owner_id=grant['owner_id'], scope=scope) 

413 except Exception as e: 

414 logger.warning( 

415 "CallService.attach_agent: bridge spawn failed for " 

416 "(call=%s, agent=%s): %s; participant row kept", 

417 call_id, agent_id, e) 

418 return participant 

419 

420 # ── Internal helpers ──────────────────────────────────────────── 

421 

422 @staticmethod 

423 def _participant_dict(db, participant_id: str) -> Dict[str, Any]: 

424 row = db.execute(text( 

425 "SELECT id, call_id, user_id, agent_kind, joined_at, " 

426 " left_at, is_muted, is_video_on, is_screen_sharing, " 

427 " device_kind " 

428 "FROM call_participants WHERE id = :id"), 

429 {'id': participant_id} 

430 ).fetchone() 

431 if not row: 

432 return {} 

433 return { 

434 'id': row[0], 'call_id': row[1], 'user_id': row[2], 

435 'agent_kind': row[3], 

436 'joined_at': str(row[4]) if row[4] else None, 

437 'left_at': str(row[5]) if row[5] else None, 

438 'is_muted': bool(row[6]), 'is_video_on': bool(row[7]), 

439 'is_screen_sharing': bool(row[8]), 'device_kind': row[9], 

440 } 

441 

442 @staticmethod 

443 def _grant_dict(db, grant_id: str) -> Dict[str, Any]: 

444 row = db.execute(text( 

445 "SELECT id, agent_id, owner_id, parent_kind, parent_id, " 

446 " scope, granted_at, revoked_at, source " 

447 "FROM agent_join_grants WHERE id = :id"), 

448 {'id': grant_id} 

449 ).fetchone() 

450 if not row: 

451 return {} 

452 try: 

453 scope = json.loads(row[5]) if row[5] else {} 

454 except Exception: 

455 scope = {} 

456 return { 

457 'id': row[0], 'agent_id': row[1], 'owner_id': row[2], 

458 'parent_kind': row[3], 'parent_id': row[4], 

459 'scope': scope, 

460 'granted_at': str(row[6]) if row[6] else None, 

461 'revoked_at': str(row[7]) if row[7] else None, 

462 'source': row[8], 

463 } 

464 

465 

466def _is_parent_member(db, parent_kind: str, parent_id: str, 

467 user_id: str) -> bool: 

468 """Membership check via the polymorphic memberships table. 

469 

470 Fall back to the legacy community_memberships table for backward 

471 compat — Plan P.2 dual-write contract during the v40→v48 transition. 

472 """ 

473 row = db.execute(text( 

474 "SELECT 1 FROM memberships " 

475 "WHERE parent_kind = :pk AND parent_id = :pid " 

476 "AND member_id = :uid LIMIT 1"), 

477 {'pk': parent_kind, 'pid': parent_id, 'uid': user_id} 

478 ).fetchone() 

479 if row is not None: 

480 return True 

481 if parent_kind == 'community': 

482 row = db.execute(text( 

483 "SELECT 1 FROM community_memberships " 

484 "WHERE community_id = :pid AND user_id = :uid LIMIT 1"), 

485 {'pid': parent_id, 'uid': user_id} 

486 ).fetchone() 

487 return row is not None 

488 return False 

489 

490 

491def _is_parent_admin(db, parent_kind: str, parent_id: str, 

492 user_id: str) -> bool: 

493 row = db.execute(text( 

494 "SELECT 1 FROM memberships " 

495 "WHERE parent_kind = :pk AND parent_id = :pid " 

496 "AND member_id = :uid AND role IN ('admin', 'owner', 'mod') " 

497 "LIMIT 1"), 

498 {'pk': parent_kind, 'pid': parent_id, 'uid': user_id} 

499 ).fetchone() 

500 return row is not None 

501 

502 

503def _row_to_dict(row) -> Dict[str, Any]: 

504 try: 

505 settings = json.loads(row[10]) if row[10] else {} 

506 except Exception: 

507 settings = {} 

508 return { 

509 'id': row[0], 'parent_kind': row[1], 'parent_id': row[2], 

510 'title': row[3], 'kind': row[4], 'started_by': row[5], 

511 'started_at': str(row[6]) if row[6] else None, 

512 'ended_at': str(row[7]) if row[7] else None, 

513 'livekit_room_sid': row[8], 'max_participants': row[9], 

514 'settings': settings, 

515 } 

516 

517 

518__all__ = ['CallService', 'CallError', 'ALLOWED_CALL_KINDS']